mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 12:12:08 +08:00
enhancement #I4HKZG 借鉴asyncTool对异步线程底层进行了彻底重构
This commit is contained in:
@@ -11,12 +11,11 @@ package com.yomahub.liteflow.entity.flow;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.ttl.threadpool.TtlExecutors;
|
||||
import com.yomahub.liteflow.asynctool.executor.Async;
|
||||
import com.yomahub.liteflow.asynctool.worker.ResultState;
|
||||
import com.yomahub.liteflow.asynctool.worker.WorkResult;
|
||||
import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper;
|
||||
import com.yomahub.liteflow.entity.data.DataBus;
|
||||
import com.yomahub.liteflow.entity.data.Slot;
|
||||
import com.yomahub.liteflow.entity.flow.parallel.CompletableFutureTimeout;
|
||||
import com.yomahub.liteflow.entity.flow.parallel.ParallelSupplier;
|
||||
import com.yomahub.liteflow.entity.flow.parallel.WhenFutureObj;
|
||||
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
|
||||
import com.yomahub.liteflow.exception.FlowSystemException;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
@@ -95,63 +94,90 @@ public class Chain implements Executable {
|
||||
return chainName;
|
||||
}
|
||||
|
||||
|
||||
//使用线程池执行when并发流程
|
||||
//这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
|
||||
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{
|
||||
|
||||
//此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
|
||||
ExecutorService parallelExecutor = TtlExecutors.getTtlExecutorService(ExecutorHelper.loadInstance().buildExecutor());
|
||||
|
||||
|
||||
//获得liteflow的参数
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
//封装asyncTool的workerWrapper对象
|
||||
List<WorkerWrapper<Void, String>> parallelWorkerWrapperList = condition.getNodeList().stream()
|
||||
.map(executable -> new WorkerWrapper.Builder<Void, String>()
|
||||
.worker(new ParallelWorker(executable, slotIndex))
|
||||
.next(new WorkerWrapper.Builder<Void, Void>().worker((object, allWrappers) -> Void.TYPE.newInstance()).build(), true)
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
//定义是否中断参数
|
||||
//这里为什么要定义成数组呢,因为后面lumbda要用到,根据final不能修改引用的原则,这里用了数组对象
|
||||
final boolean[] interrupted = {false};
|
||||
|
||||
boolean interrupted = false;
|
||||
boolean asyncToolResult;
|
||||
//这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清
|
||||
//1.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
|
||||
//2.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象
|
||||
//3.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象
|
||||
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().map(
|
||||
executable -> CompletableFutureTimeout.completeOnTimeout(
|
||||
WhenFutureObj.timeOut(executable.getExecuteName()),
|
||||
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex, requestId), parallelExecutor),
|
||||
liteflowConfig.getWhenMaxWaitSeconds(),
|
||||
TimeUnit.SECONDS
|
||||
)
|
||||
).collect(Collectors.toList());
|
||||
|
||||
//这里利用asyncTool框架进行并行调用
|
||||
try{
|
||||
asyncToolResult = Async.beginWork(liteflowConfig.getWhenMaxWaitSeconds()*1000,
|
||||
parallelExecutor,
|
||||
parallelWorkerWrapperList.toArray(new WorkerWrapper[]{}));
|
||||
}catch (Exception e){
|
||||
throw new WhenExecuteException(StrUtil.format("requestId [{}] AsyncTool framework execution exception.", requestId));
|
||||
|
||||
CompletableFuture<?> resultCompletableFuture;
|
||||
|
||||
//这里判断执行方式
|
||||
//如果any为false,说明这些异步任务全部执行好或者超时,才返回
|
||||
//如果any为true,说明这些异步任务只要任意一个执行完成,就返回
|
||||
if(condition.isAny()){
|
||||
//把这些CompletableFuture通过anyOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[]{}));
|
||||
}else{
|
||||
//把这些CompletableFuture通过allOf合成一个CompletableFuture
|
||||
resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[]{}));
|
||||
}
|
||||
|
||||
//asyncToolResult为false,说明是timeout状态了
|
||||
//遍历wrapper拿到worker,拿到defaultValue,其实就是nodeId,打印出来
|
||||
if (!asyncToolResult){
|
||||
parallelWorkerWrapperList.forEach(workerWrapper -> {
|
||||
if(workerWrapper.getWorkResult().getResultState().equals(ResultState.TIMEOUT)){
|
||||
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]",
|
||||
requestId, workerWrapper.getWorker().defaultValue());
|
||||
}
|
||||
});
|
||||
interrupted = true;
|
||||
try {
|
||||
//进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
|
||||
resultCompletableFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
LOG.error("there was an error when executing the CompletableFuture",e);
|
||||
interrupted[0] = true;
|
||||
}
|
||||
|
||||
//errorResume是一个condition里的参数,如果为true,表示即便出现了错误,也继续执行下一个condition
|
||||
//当配置了errorResume = false,出现interrupted或者其中一个线程执行出错的情况,将抛出WhenExecuteException
|
||||
//拿到已经完成的CompletableFuture
|
||||
//如果any为false,那么所有任务都已经完成
|
||||
//如果any为true,那么这里拿到的是第一个完成的任务
|
||||
//这里过滤和转换一起用lumbda做了
|
||||
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(CompletableFuture::isDone).map(f -> {
|
||||
try {
|
||||
return f.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
interrupted[0] = true;
|
||||
return null;
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
//判断超时,上面已经拿到了所有已经完成的CompletableFuture
|
||||
//那我们只要过滤出超时的CompletableFuture
|
||||
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream().filter(WhenFutureObj::isTimeout).collect(Collectors.toList());
|
||||
|
||||
//输出超时信息
|
||||
timeOutWhenFutureObjList.forEach(whenFutureObj ->
|
||||
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, whenFutureObj.getExecutorName()));
|
||||
|
||||
//当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException
|
||||
if (!condition.isErrorResume()) {
|
||||
if (interrupted) {
|
||||
if (interrupted[0]) {
|
||||
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId));
|
||||
}
|
||||
|
||||
for (WorkerWrapper<Void, String> workerWrapper : parallelWorkerWrapperList){
|
||||
WorkResult<String> workResult = workerWrapper.getWorkResult();
|
||||
if (!workResult.getResultState().equals(ResultState.SUCCESS)){
|
||||
throw workResult.getEx();
|
||||
//循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常
|
||||
for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){
|
||||
if (!whenFutureObj.isSuccess()){
|
||||
LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), requestId));
|
||||
throw whenFutureObj.getEx();
|
||||
}
|
||||
}
|
||||
} else if (interrupted) {
|
||||
// 这里由于配置了errorResume=true,所以只打印warn日志
|
||||
} else if (interrupted[0]) {
|
||||
// 这里由于配置了errorResume,所以只打印warn日志
|
||||
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,75 @@
|
||||
package com.yomahub.liteflow.entity.flow.parallel;
|
||||
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* java8中CompletableFuture异步处理超时的方法
|
||||
*
|
||||
* Java 8 的 CompletableFuture 并没有 timeout 机制,虽然可以在 get 的时候指定 timeout,是一个同步堵塞的操作。怎样让 timeout 也是异步的呢?Java 8 内有内建的机
|
||||
* 制支持,一般的实现方案是启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),
|
||||
* 然后用acceptEither() 或者 applyToEither 看是先计算完成还是先超时:
|
||||
*
|
||||
* 在 java 9 引入了 orTimeout 和 completeOnTimeOut 两个方法支持 异步 timeout 机制:
|
||||
*
|
||||
* public CompletableFuture orTimeout(long timeout, TimeUnit unit) : completes the CompletableFuture with a TimeoutException after the specified timeout has elapsed.
|
||||
* public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) : provides a default value in the case that the CompletableFuture pipeline times out.
|
||||
* 内部实现上跟我们上面的实现方案是一模一样的,只是现在不需要自己实现了。
|
||||
*
|
||||
* 实际上hystrix等熔断的框架,其实现线程Timeout之后就关闭线程,也是基于同样的道理,所以我们可以看到hystrix中会有一个Timer Thread
|
||||
*
|
||||
*
|
||||
* @author luliang
|
||||
* @since 2.6.4
|
||||
*/
|
||||
public class CompletableFutureTimeout {
|
||||
/**
|
||||
* Singleton delay scheduler, used only for starting and * cancelling tasks.
|
||||
*/
|
||||
static final class Delayer {
|
||||
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
|
||||
return delayer.schedule(command, delay, unit);
|
||||
}
|
||||
|
||||
static final class DaemonThreadFactory implements ThreadFactory {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(r);
|
||||
t.setDaemon(true);
|
||||
t.setName("CompletableFutureDelayScheduler");
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
static final ScheduledThreadPoolExecutor delayer;
|
||||
|
||||
// 注意,这里使用一个线程就可以搞定 因为这个线程并不真的执行请求 而是仅仅抛出一个异常
|
||||
static {
|
||||
delayer = new ScheduledThreadPoolExecutor(1, new CompletableFutureTimeout.Delayer.DaemonThreadFactory());
|
||||
delayer.setRemoveOnCancelPolicy(true);
|
||||
}
|
||||
}
|
||||
|
||||
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
|
||||
CompletableFuture<T> result = new CompletableFuture<T>();
|
||||
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
|
||||
CompletableFutureTimeout.Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值
|
||||
*/
|
||||
public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout, TimeUnit unit) {
|
||||
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
|
||||
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
|
||||
}
|
||||
|
||||
/**
|
||||
* 哪个先完成 就apply哪一个结果 这是一个关键的API,不设置默认值,超时后抛出异常
|
||||
*/
|
||||
public static <T> CompletableFuture<T> orTimeout(T t, CompletableFuture<T> future, long timeout, TimeUnit unit) {
|
||||
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
|
||||
return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.yomahub.liteflow.entity.flow.parallel;
|
||||
|
||||
import com.yomahub.liteflow.entity.flow.Executable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 并行异步worker对象,提供给CompletableFuture用
|
||||
* @author Bryan.Zhang
|
||||
* @since 2.6.4
|
||||
*/
|
||||
public class ParallelSupplier implements Supplier<WhenFutureObj> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ParallelSupplier.class);
|
||||
|
||||
private final Executable executableItem;
|
||||
|
||||
private final Integer slotIndex;
|
||||
|
||||
private final String requestId;
|
||||
|
||||
public ParallelSupplier(Executable executableItem, Integer slotIndex, String requestId) {
|
||||
this.executableItem = executableItem;
|
||||
this.slotIndex = slotIndex;
|
||||
this.requestId = requestId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WhenFutureObj get() {
|
||||
try {
|
||||
executableItem.execute(slotIndex);
|
||||
return WhenFutureObj.success(executableItem.getExecuteName());
|
||||
} catch (Exception e){
|
||||
return WhenFutureObj.fail(executableItem.getExecuteName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,74 @@
|
||||
package com.yomahub.liteflow.entity.flow.parallel;
|
||||
|
||||
/**
|
||||
* 并行异步CompletableFuture里的值对象
|
||||
* @author Bryan.Zhang
|
||||
* @since 2.6.4
|
||||
*/
|
||||
public class WhenFutureObj {
|
||||
|
||||
private boolean success;
|
||||
|
||||
private boolean timeout;
|
||||
|
||||
private String executorName;
|
||||
|
||||
private Exception ex;
|
||||
|
||||
public static WhenFutureObj success(String executorName){
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(true);
|
||||
result.setTimeout(false);
|
||||
result.setExecutorName(executorName);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static WhenFutureObj fail(String executorName, Exception ex){
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(false);
|
||||
result.setTimeout(false);
|
||||
result.setExecutorName(executorName);
|
||||
result.setEx(ex);
|
||||
return result;
|
||||
}
|
||||
|
||||
public static WhenFutureObj timeOut(String executorName){
|
||||
WhenFutureObj result = new WhenFutureObj();
|
||||
result.setSuccess(false);
|
||||
result.setTimeout(true);
|
||||
result.setExecutorName(executorName);
|
||||
return result;
|
||||
}
|
||||
|
||||
public boolean isSuccess() {
|
||||
return success;
|
||||
}
|
||||
|
||||
public void setSuccess(boolean success) {
|
||||
this.success = success;
|
||||
}
|
||||
|
||||
public String getExecutorName() {
|
||||
return executorName;
|
||||
}
|
||||
|
||||
public void setExecutorName(String executorName) {
|
||||
this.executorName = executorName;
|
||||
}
|
||||
|
||||
public Exception getEx() {
|
||||
return ex;
|
||||
}
|
||||
|
||||
public void setEx(Exception ex) {
|
||||
this.ex = ex;
|
||||
}
|
||||
|
||||
public boolean isTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(boolean timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user