From b62638ddfcb7ca512718b71d2444e027563cd4a3 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Tue, 9 Nov 2021 22:13:13 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I4HKZG=20=E5=80=9F=E9=89=B4async?= =?UTF-8?q?Tool=E5=AF=B9=E5=BC=82=E6=AD=A5=E7=BA=BF=E7=A8=8B=E5=BA=95?= =?UTF-8?q?=E5=B1=82=E8=BF=9B=E8=A1=8C=E4=BA=86=E5=BD=BB=E5=BA=95=E9=87=8D?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/flow/Chain.java | 110 +++++++++++------- .../parallel/CompletableFutureTimeout.java | 75 ++++++++++++ .../flow/parallel/ParallelSupplier.java | 38 ++++++ .../entity/flow/parallel/WhenFutureObj.java | 74 ++++++++++++ 4 files changed, 255 insertions(+), 42 deletions(-) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/CompletableFutureTimeout.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/WhenFutureObj.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 7bea2b6d3..3f367f471 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -11,12 +11,11 @@ package com.yomahub.liteflow.entity.flow; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.ttl.threadpool.TtlExecutors; -import com.yomahub.liteflow.asynctool.executor.Async; -import com.yomahub.liteflow.asynctool.worker.ResultState; -import com.yomahub.liteflow.asynctool.worker.WorkResult; -import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.entity.flow.parallel.CompletableFutureTimeout; +import com.yomahub.liteflow.entity.flow.parallel.ParallelSupplier; +import com.yomahub.liteflow.entity.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.enums.ExecuteTypeEnum; import com.yomahub.liteflow.exception.FlowSystemException; import com.yomahub.liteflow.exception.WhenExecuteException; @@ -95,63 +94,90 @@ public class Chain implements Executable { return chainName; } - //使用线程池执行when并发流程 + //这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读 private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{ - //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = TtlExecutors.getTtlExecutorService(ExecutorHelper.loadInstance().buildExecutor()); - + //获得liteflow的参数 LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - //封装asyncTool的workerWrapper对象 - List> parallelWorkerWrapperList = condition.getNodeList().stream() - .map(executable -> new WorkerWrapper.Builder() - .worker(new ParallelWorker(executable, slotIndex)) - .next(new WorkerWrapper.Builder().worker((object, allWrappers) -> Void.TYPE.newInstance()).build(), true) - .build()) - .collect(Collectors.toList()); + //定义是否中断参数 + //这里为什么要定义成数组呢,因为后面lumbda要用到,根据final不能修改引用的原则,这里用了数组对象 + final boolean[] interrupted = {false}; - boolean interrupted = false; - boolean asyncToolResult; + //这里主要是做了封装CompletableFuture对象,用lumbda表达式做了很多事情,这句代码要仔细理清 + //1.根据condition.getNodeList()的集合进行流处理,用map进行把executable对象转换成List> + //2.在转的过程中,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 + //3.第2个参数是主要的本体CompletableFuture,传入了ParallelSupplier和线程池对象 + List> completableFutureList = condition.getNodeList().stream().map( + executable -> CompletableFutureTimeout.completeOnTimeout( + WhenFutureObj.timeOut(executable.getExecuteName()), + CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex, requestId), parallelExecutor), + liteflowConfig.getWhenMaxWaitSeconds(), + TimeUnit.SECONDS + ) + ).collect(Collectors.toList()); - //这里利用asyncTool框架进行并行调用 - try{ - asyncToolResult = Async.beginWork(liteflowConfig.getWhenMaxWaitSeconds()*1000, - parallelExecutor, - parallelWorkerWrapperList.toArray(new WorkerWrapper[]{})); - }catch (Exception e){ - throw new WhenExecuteException(StrUtil.format("requestId [{}] AsyncTool framework execution exception.", requestId)); + + CompletableFuture resultCompletableFuture; + + //这里判断执行方式 + //如果any为false,说明这些异步任务全部执行好或者超时,才返回 + //如果any为true,说明这些异步任务只要任意一个执行完成,就返回 + if(condition.isAny()){ + //把这些CompletableFuture通过anyOf合成一个CompletableFuture + resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[]{})); + }else{ + //把这些CompletableFuture通过allOf合成一个CompletableFuture + resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[]{})); } - //asyncToolResult为false,说明是timeout状态了 - //遍历wrapper拿到worker,拿到defaultValue,其实就是nodeId,打印出来 - if (!asyncToolResult){ - parallelWorkerWrapperList.forEach(workerWrapper -> { - if(workerWrapper.getWorkResult().getResultState().equals(ResultState.TIMEOUT)){ - LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", - requestId, workerWrapper.getWorker().defaultValue()); - } - }); - interrupted = true; + try { + //进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回 + resultCompletableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("there was an error when executing the CompletableFuture",e); + interrupted[0] = true; } - //errorResume是一个condition里的参数,如果为true,表示即便出现了错误,也继续执行下一个condition - //当配置了errorResume = false,出现interrupted或者其中一个线程执行出错的情况,将抛出WhenExecuteException + //拿到已经完成的CompletableFuture + //如果any为false,那么所有任务都已经完成 + //如果any为true,那么这里拿到的是第一个完成的任务 + //这里过滤和转换一起用lumbda做了 + List allCompletableWhenFutureObjList = completableFutureList.stream().filter(CompletableFuture::isDone).map(f -> { + try { + return f.get(); + } catch (InterruptedException | ExecutionException e) { + interrupted[0] = true; + return null; + } + }).collect(Collectors.toList()); + + //判断超时,上面已经拿到了所有已经完成的CompletableFuture + //那我们只要过滤出超时的CompletableFuture + List timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream().filter(WhenFutureObj::isTimeout).collect(Collectors.toList()); + + //输出超时信息 + timeOutWhenFutureObjList.forEach(whenFutureObj -> + LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, whenFutureObj.getExecutorName())); + + //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException if (!condition.isErrorResume()) { - if (interrupted) { + if (interrupted[0]) { throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId)); } - for (WorkerWrapper workerWrapper : parallelWorkerWrapperList){ - WorkResult workResult = workerWrapper.getWorkResult(); - if (!workResult.getResultState().equals(ResultState.SUCCESS)){ - throw workResult.getEx(); + //循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常 + for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){ + if (!whenFutureObj.isSuccess()){ + LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), requestId)); + throw whenFutureObj.getEx(); } } - } else if (interrupted) { - // 这里由于配置了errorResume=true,所以只打印warn日志 + } else if (interrupted[0]) { + // 这里由于配置了errorResume,所以只打印warn日志 LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/CompletableFutureTimeout.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/CompletableFutureTimeout.java new file mode 100644 index 000000000..387c0fd59 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/CompletableFutureTimeout.java @@ -0,0 +1,75 @@ +package com.yomahub.liteflow.entity.flow.parallel; + +import java.util.concurrent.*; +import java.util.function.Function; + +/** + * java8中CompletableFuture异步处理超时的方法 + * + * Java 8 的 CompletableFuture 并没有 timeout 机制,虽然可以在 get 的时候指定 timeout,是一个同步堵塞的操作。怎样让 timeout 也是异步的呢?Java 8 内有内建的机 + * 制支持,一般的实现方案是启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()), + * 然后用acceptEither() 或者 applyToEither 看是先计算完成还是先超时: + * + * 在 java 9 引入了 orTimeout 和 completeOnTimeOut 两个方法支持 异步 timeout 机制: + * + * public CompletableFuture orTimeout(long timeout, TimeUnit unit) : completes the CompletableFuture with a TimeoutException after the specified timeout has elapsed. + * public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) : provides a default value in the case that the CompletableFuture pipeline times out. + * 内部实现上跟我们上面的实现方案是一模一样的,只是现在不需要自己实现了。 + * + * 实际上hystrix等熔断的框架,其实现线程Timeout之后就关闭线程,也是基于同样的道理,所以我们可以看到hystrix中会有一个Timer Thread + * + * + * @author luliang + * @since 2.6.4 + */ +public class CompletableFutureTimeout { + /** + * Singleton delay scheduler, used only for starting and * cancelling tasks. + */ + static final class Delayer { + static ScheduledFuture delay(Runnable command, long delay, TimeUnit unit) { + return delayer.schedule(command, delay, unit); + } + + static final class DaemonThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r); + t.setDaemon(true); + t.setName("CompletableFutureDelayScheduler"); + return t; + } + } + + static final ScheduledThreadPoolExecutor delayer; + + // 注意,这里使用一个线程就可以搞定 因为这个线程并不真的执行请求 而是仅仅抛出一个异常 + static { + delayer = new ScheduledThreadPoolExecutor(1, new CompletableFutureTimeout.Delayer.DaemonThreadFactory()); + delayer.setRemoveOnCancelPolicy(true); + } + } + + public static CompletableFuture timeoutAfter(long timeout, TimeUnit unit) { + CompletableFuture result = new CompletableFuture(); + // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher + CompletableFutureTimeout.Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit); + return result; + } + + /** + * 哪个先完成 就apply哪一个结果 这是一个关键的API,exceptionally出现异常后返回默认值 + */ + public static CompletableFuture completeOnTimeout(T t, CompletableFuture future, long timeout, TimeUnit unit) { + final CompletableFuture timeoutFuture = timeoutAfter(timeout, unit); + return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t); + } + + /** + * 哪个先完成 就apply哪一个结果 这是一个关键的API,不设置默认值,超时后抛出异常 + */ + public static CompletableFuture orTimeout(T t, CompletableFuture future, long timeout, TimeUnit unit) { + final CompletableFuture timeoutFuture = timeoutAfter(timeout, unit); + return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java new file mode 100644 index 000000000..74bca2cff --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java @@ -0,0 +1,38 @@ +package com.yomahub.liteflow.entity.flow.parallel; + +import com.yomahub.liteflow.entity.flow.Executable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.function.Supplier; + +/** + * 并行异步worker对象,提供给CompletableFuture用 + * @author Bryan.Zhang + * @since 2.6.4 + */ +public class ParallelSupplier implements Supplier { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelSupplier.class); + + private final Executable executableItem; + + private final Integer slotIndex; + + private final String requestId; + + public ParallelSupplier(Executable executableItem, Integer slotIndex, String requestId) { + this.executableItem = executableItem; + this.slotIndex = slotIndex; + this.requestId = requestId; + } + + @Override + public WhenFutureObj get() { + try { + executableItem.execute(slotIndex); + return WhenFutureObj.success(executableItem.getExecuteName()); + } catch (Exception e){ + return WhenFutureObj.fail(executableItem.getExecuteName(), e); + } + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/WhenFutureObj.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/WhenFutureObj.java new file mode 100644 index 000000000..f36859ebe --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/WhenFutureObj.java @@ -0,0 +1,74 @@ +package com.yomahub.liteflow.entity.flow.parallel; + +/** + * 并行异步CompletableFuture里的值对象 + * @author Bryan.Zhang + * @since 2.6.4 + */ +public class WhenFutureObj { + + private boolean success; + + private boolean timeout; + + private String executorName; + + private Exception ex; + + public static WhenFutureObj success(String executorName){ + WhenFutureObj result = new WhenFutureObj(); + result.setSuccess(true); + result.setTimeout(false); + result.setExecutorName(executorName); + return result; + } + + public static WhenFutureObj fail(String executorName, Exception ex){ + WhenFutureObj result = new WhenFutureObj(); + result.setSuccess(false); + result.setTimeout(false); + result.setExecutorName(executorName); + result.setEx(ex); + return result; + } + + public static WhenFutureObj timeOut(String executorName){ + WhenFutureObj result = new WhenFutureObj(); + result.setSuccess(false); + result.setTimeout(true); + result.setExecutorName(executorName); + return result; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(boolean success) { + this.success = success; + } + + public String getExecutorName() { + return executorName; + } + + public void setExecutorName(String executorName) { + this.executorName = executorName; + } + + public Exception getEx() { + return ex; + } + + public void setEx(Exception ex) { + this.ex = ex; + } + + public boolean isTimeout() { + return timeout; + } + + public void setTimeout(boolean timeout) { + this.timeout = timeout; + } +}