From e84ae7301804c39cc6c024d892213ce6f7a3ceb2 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Mon, 8 Nov 2021 13:48:39 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I3DM92=20=E9=9B=86=E6=88=90async?= =?UTF-8?q?Tool=E4=BD=9C=E4=B8=BA=E7=BA=BF=E7=A8=8B=E7=BC=96=E6=8E=92?= =?UTF-8?q?=E7=9A=84=E6=A0=B8=E5=BF=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- liteflow-async-tool/pom.xml | 19 + .../asynctool/callback/DefaultCallback.java | 20 + .../callback/DefaultGroupCallback.java | 21 + .../asynctool/callback/ICallback.java | 26 + .../asynctool/callback/IGroupCallback.java | 20 + .../asynctool/callback/ITimeoutWorker.java | 20 + .../liteflow/asynctool/callback/IWorker.java | 30 + .../asynctool/exception/SkippedException.java | 16 + .../liteflow/asynctool/executor/Async.java | 160 +++++ .../asynctool/executor/timer/SystemClock.java | 51 ++ .../asynctool/worker/DependWrapper.java | 58 ++ .../asynctool/worker/ResultState.java | 12 + .../liteflow/asynctool/worker/WorkResult.java | 63 ++ .../asynctool/wrapper/WorkerWrapper.java | 610 ++++++++++++++++++ liteflow-core/pom.xml | 5 + pom.xml | 17 +- 16 files changed, 1135 insertions(+), 13 deletions(-) create mode 100644 liteflow-async-tool/pom.xml create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultCallback.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultGroupCallback.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ICallback.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IGroupCallback.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ITimeoutWorker.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/exception/SkippedException.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/Async.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/timer/SystemClock.java create mode 100644 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/DependWrapper.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/ResultState.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/WorkResult.java create mode 100755 liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java diff --git a/liteflow-async-tool/pom.xml b/liteflow-async-tool/pom.xml new file mode 100644 index 000000000..b64aadba6 --- /dev/null +++ b/liteflow-async-tool/pom.xml @@ -0,0 +1,19 @@ + + + + liteflow + com.yomahub + 2.6.4 + + 4.0.0 + + liteflow-async-tool + + + 8 + 8 + + + \ No newline at end of file diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultCallback.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultCallback.java new file mode 100755 index 000000000..6f5db0cbb --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultCallback.java @@ -0,0 +1,20 @@ +package com.yomahub.liteflow.asynctool.callback; + +import com.yomahub.liteflow.asynctool.worker.WorkResult; + +/** + * 默认回调类,如果不设置的话,会默认给这个回调 + * @author wuweifeng wrote on 2019-11-19. + */ +public class DefaultCallback implements ICallback { + @Override + public void begin() { + + } + + @Override + public void result(boolean success, T param, WorkResult workResult) { + + } + +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultGroupCallback.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultGroupCallback.java new file mode 100644 index 000000000..edec1bbb5 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/DefaultGroupCallback.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.asynctool.callback; + +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +import java.util.List; + +/** + * @author wuweifeng wrote on 2019-12-27 + * @version 1.0 + */ +public class DefaultGroupCallback implements IGroupCallback { + @Override + public void success(List workerWrappers) { + + } + + @Override + public void failure(List workerWrappers, Exception e) { + + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ICallback.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ICallback.java new file mode 100755 index 000000000..4576a9221 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ICallback.java @@ -0,0 +1,26 @@ +package com.yomahub.liteflow.asynctool.callback; + + +import com.yomahub.liteflow.asynctool.worker.WorkResult; + +/** + * 每个执行单元执行完毕后,会回调该接口

+ * 需要监听执行结果的,实现该接口即可 + * + * @author wuweifeng wrote on 2019-11-19. + */ +@FunctionalInterface +public interface ICallback { + + /** + * 任务开始的监听 + */ + default void begin() { + + } + + /** + * 耗时操作执行完毕后,就给value注入值 + */ + void result(boolean success, T param, WorkResult workResult); +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IGroupCallback.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IGroupCallback.java new file mode 100755 index 000000000..77d8f4650 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IGroupCallback.java @@ -0,0 +1,20 @@ +package com.yomahub.liteflow.asynctool.callback; + +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +import java.util.List; + +/** + * 如果是异步执行整组的话,可以用这个组回调。不推荐使用 + * @author wuweifeng wrote on 2019-11-19. + */ +public interface IGroupCallback { + /** + * 成功后,可以从wrapper里去getWorkResult + */ + void success(List workerWrappers); + /** + * 失败了,也可以从wrapper里去getWorkResult + */ + void failure(List workerWrappers, Exception e); +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ITimeoutWorker.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ITimeoutWorker.java new file mode 100644 index 000000000..de62611eb --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/ITimeoutWorker.java @@ -0,0 +1,20 @@ +package com.yomahub.liteflow.asynctool.callback; + +/** + * @author wuweifeng wrote on 2019-12-20 + * @version 1.0 + */ +public interface ITimeoutWorker extends IWorker { + /** + * 每个worker都可以设置超时时间 + * @return 毫秒超时时间 + */ + long timeOut(); + + /** + * 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时) + *

注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍

+ * @return 是否开启 + */ + boolean enableTimeOut(); +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java new file mode 100755 index 000000000..54ac7ff50 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/callback/IWorker.java @@ -0,0 +1,30 @@ +package com.yomahub.liteflow.asynctool.callback; + +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * 每个最小执行单元需要实现该接口 + * + * @author wuweifeng wrote on 2019-11-19. + */ +@FunctionalInterface +public interface IWorker { + /** + * 在这里做耗时操作,如rpc请求、IO等 + * + * @param object object + * @param allWrappers 任务包装 + */ + V action(T object, Map allWrappers); + + /** + * 超时、异常时,返回的默认值 + * + * @return 默认值 + */ + default V defaultValue() { + return null; + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/exception/SkippedException.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/exception/SkippedException.java new file mode 100644 index 000000000..0afceaacb --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/exception/SkippedException.java @@ -0,0 +1,16 @@ +package com.yomahub.liteflow.asynctool.exception; + +/** + * 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception + * @author wuweifeng wrote on 2020-02-18 + * @version 1.0 + */ +public class SkippedException extends RuntimeException { + public SkippedException() { + super(); + } + + public SkippedException(String message) { + super(message); + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/Async.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/Async.java new file mode 100644 index 000000000..d76b6b752 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/Async.java @@ -0,0 +1,160 @@ +package com.yomahub.liteflow.asynctool.executor; + +import com.yomahub.liteflow.asynctool.callback.DefaultGroupCallback; +import com.yomahub.liteflow.asynctool.callback.IGroupCallback; +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +/** + * 类入口,可以根据自己情况调整core线程的数量 + * @author wuweifeng wrote on 2019-12-18 + * @version 1.0 + */ +public class Async { + /** + * 默认线程池 + */ + private static final ThreadPoolExecutor COMMON_POOL = + new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024, + 15L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + (ThreadFactory) Thread::new); + /** + * 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个 + */ + private static ExecutorService executorService; + + /** + * 出发点 + */ + public static boolean beginWork(long timeout, ExecutorService executorService, List workerWrappers) throws ExecutionException, InterruptedException { + if(workerWrappers == null || workerWrappers.size() == 0) { + return false; + } + //保存线程池变量 + Async.executorService = executorService; + //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result + Map forParamUseWrappers = new ConcurrentHashMap<>(); + CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; + for (int i = 0; i < workerWrappers.size(); i++) { + WorkerWrapper wrapper = workerWrappers.get(i); + futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService); + } + try { + CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); + return true; + } catch (TimeoutException e) { + Set set = new HashSet<>(); + totalWorkers(workerWrappers, set); + for (WorkerWrapper wrapper : set) { + wrapper.stopNow(); + } + return false; + } + } + + /** + * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL + */ + public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { + if(workerWrapper == null || workerWrapper.length == 0) { + return false; + } + List workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList()); + return beginWork(timeout, executorService, workerWrappers); + } + + /** + * 同步阻塞,直到所有都完成,或失败 + */ + public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { + return beginWork(timeout, COMMON_POOL, workerWrapper); + } + + public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { + beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper); + } + + /** + * 异步执行,直到所有都完成,或失败后,发起回调 + */ + public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { + if (groupCallback == null) { + groupCallback = new DefaultGroupCallback(); + } + IGroupCallback finalGroupCallback = groupCallback; + if (executorService != null) { + executorService.submit(() -> { + try { + boolean success = beginWork(timeout, executorService, workerWrapper); + if (success) { + finalGroupCallback.success(Arrays.asList(workerWrapper)); + } else { + finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + finalGroupCallback.failure(Arrays.asList(workerWrapper), e); + } + }); + } else { + COMMON_POOL.submit(() -> { + try { + boolean success = beginWork(timeout, COMMON_POOL, workerWrapper); + if (success) { + finalGroupCallback.success(Arrays.asList(workerWrapper)); + } else { + finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + finalGroupCallback.failure(Arrays.asList(workerWrapper), e); + } + }); + } + + } + + /** + * 总共多少个执行单元 + */ + @SuppressWarnings("unchecked") + private static void totalWorkers(List workerWrappers, Set set) { + set.addAll(workerWrappers); + for (WorkerWrapper wrapper : workerWrappers) { + if (wrapper.getNextWrappers() == null) { + continue; + } + List wrappers = wrapper.getNextWrappers(); + totalWorkers(wrappers, set); + } + + } + + /** + * 关闭线程池 + */ + public static void shutDown() { + shutDown(executorService); + } + + /** + * 关闭线程池 + */ + public static void shutDown(ExecutorService executorService) { + if (executorService != null) { + executorService.shutdown(); + } else { + COMMON_POOL.shutdown(); + } + } + + public static String getThreadCount() { + return "activeCount=" + COMMON_POOL.getActiveCount() + + " completedCount " + COMMON_POOL.getCompletedTaskCount() + + " largestCount " + COMMON_POOL.getLargestPoolSize(); + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/timer/SystemClock.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/timer/SystemClock.java new file mode 100644 index 000000000..ce1210dc6 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/executor/timer/SystemClock.java @@ -0,0 +1,51 @@ +package com.yomahub.liteflow.asynctool.executor.timer; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 用于解决高并发下System.currentTimeMillis卡顿 + * @author lry + */ +public class SystemClock { + + private final int period; + + private final AtomicLong now; + + private static class InstanceHolder { + private static final SystemClock INSTANCE = new SystemClock(1); + } + + private SystemClock(int period) { + this.period = period; + this.now = new AtomicLong(System.currentTimeMillis()); + scheduleClockUpdating(); + } + + private static SystemClock instance() { + return InstanceHolder.INSTANCE; + } + + private void scheduleClockUpdating() { + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable, "System Clock"); + thread.setDaemon(true); + return thread; + }); + scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS); + } + + private long currentTimeMillis() { + return now.get(); + } + + /** + * 用来替换原来的System.currentTimeMillis() + */ + public static long now() { + return instance().currentTimeMillis(); + } +} \ No newline at end of file diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/DependWrapper.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/DependWrapper.java new file mode 100644 index 000000000..40c781f1f --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/DependWrapper.java @@ -0,0 +1,58 @@ +package com.yomahub.liteflow.asynctool.worker; + +import com.yomahub.liteflow.asynctool.wrapper.WorkerWrapper; + +/** + * 对依赖的wrapper的封装 + * @author wuweifeng wrote on 2019-12-20 + * @version 1.0 + */ +public class DependWrapper { + private WorkerWrapper dependWrapper; + /** + * 是否该依赖必须完成后才能执行自己.

+ * 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己 + * 如 + * 1 + * ---3 + * 2 + * 或 + * 1---3 + * 2---3 + * 这两种就不一样,上面的就是必须12都完毕,才能3 + * 下面的就是1完毕就可以3 + */ + private boolean must = true; + + public DependWrapper(WorkerWrapper dependWrapper, boolean must) { + this.dependWrapper = dependWrapper; + this.must = must; + } + + public DependWrapper() { + } + + public WorkerWrapper getDependWrapper() { + return dependWrapper; + } + + public void setDependWrapper(WorkerWrapper dependWrapper) { + this.dependWrapper = dependWrapper; + } + + public boolean isMust() { + return must; + } + + public void setMust(boolean must) { + this.must = must; + } + + @Override + public String toString() { + return "DependWrapper{" + + "dependWrapper=" + dependWrapper + + ", must=" + must + + '}'; + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/ResultState.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/ResultState.java new file mode 100755 index 000000000..fa3643e41 --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/ResultState.java @@ -0,0 +1,12 @@ +package com.yomahub.liteflow.asynctool.worker; + +/** + * 结果状态 + * @author wuweifeng wrote on 2019-11-19. + */ +public enum ResultState { + SUCCESS, + TIMEOUT, + EXCEPTION, + DEFAULT //默认状态 +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/WorkResult.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/WorkResult.java new file mode 100755 index 000000000..1bb02f7de --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/worker/WorkResult.java @@ -0,0 +1,63 @@ +package com.yomahub.liteflow.asynctool.worker; + +/** + * 执行结果 + */ +public class WorkResult { + /** + * 执行的结果 + */ + private V result; + /** + * 结果状态 + */ + private ResultState resultState; + private Exception ex; + + public WorkResult(V result, ResultState resultState) { + this(result, resultState, null); + } + + public WorkResult(V result, ResultState resultState, Exception ex) { + this.result = result; + this.resultState = resultState; + this.ex = ex; + } + + public static WorkResult defaultResult() { + return new WorkResult<>(null, ResultState.DEFAULT); + } + + @Override + public String toString() { + return "WorkResult{" + + "result=" + result + + ", resultState=" + resultState + + ", ex=" + ex + + '}'; + } + + public Exception getEx() { + return ex; + } + + public void setEx(Exception ex) { + this.ex = ex; + } + + public V getResult() { + return result; + } + + public void setResult(V result) { + this.result = result; + } + + public ResultState getResultState() { + return resultState; + } + + public void setResultState(ResultState resultState) { + this.resultState = resultState; + } +} diff --git a/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java new file mode 100755 index 000000000..aeed8c6ce --- /dev/null +++ b/liteflow-async-tool/src/main/java/com/yomahub/liteflow/asynctool/wrapper/WorkerWrapper.java @@ -0,0 +1,610 @@ +package com.yomahub.liteflow.asynctool.wrapper; + + +import com.yomahub.liteflow.asynctool.callback.DefaultCallback; +import com.yomahub.liteflow.asynctool.callback.ICallback; +import com.yomahub.liteflow.asynctool.callback.IWorker; +import com.yomahub.liteflow.asynctool.exception.SkippedException; +import com.yomahub.liteflow.asynctool.executor.timer.SystemClock; +import com.yomahub.liteflow.asynctool.worker.DependWrapper; +import com.yomahub.liteflow.asynctool.worker.ResultState; +import com.yomahub.liteflow.asynctool.worker.WorkResult; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 对每个worker及callback进行包装,一对一 + * + * @author wuweifeng wrote on 2019-11-19. + */ +public class WorkerWrapper { + /** + * 该wrapper的唯一标识 + */ + private String id; + /** + * worker将来要处理的param + */ + private T param; + private IWorker worker; + private ICallback callback; + /** + * 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程

+ * -------2 + * 1 + * -------3 + * 如1后面有2、3 + */ + private List> nextWrappers; + /** + * 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己 + * 通过must字段来控制是否依赖项必须完成 + * 1 + * -------3 + * 2 + * 1、2执行完毕后才能执行3 + */ + private List dependWrappers; + /** + * 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调 + * 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取 + *

+ * 1-finish, 2-error, 3-working + */ + private AtomicInteger state = new AtomicInteger(0); + /** + * 该map存放所有wrapper的id和wrapper映射 + */ + private Map forParamUseWrappers; + /** + * 也是个钩子变量,用来存临时的结果 + */ + private volatile WorkResult workResult = WorkResult.defaultResult(); + /** + * 是否在执行自己前,去校验nextWrapper的执行结果

+ * 1 4 + * -------3 + * 2 + * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。 + * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的 + */ + private volatile boolean needCheckNextWrapperResult = true; + + private static final int FINISH = 1; + private static final int ERROR = 2; + private static final int WORKING = 3; + private static final int INIT = 0; + + private WorkerWrapper(String id, IWorker worker, T param, ICallback callback) { + if (worker == null) { + throw new NullPointerException("async.worker is null"); + } + this.worker = worker; + this.param = param; + this.id = id; + //允许不设置回调 + if (callback == null) { + callback = new DefaultCallback<>(); + } + this.callback = callback; + } + + /** + * 开始工作 + * fromWrapper代表这次work是由哪个上游wrapper发起的 + */ + private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) { + this.forParamUseWrappers = forParamUseWrappers; + //将自己放到所有wrapper的集合里去 + forParamUseWrappers.put(id, this); + long now = SystemClock.now(); + //总的已经超时了,就快速失败,进行下一个 + if (remainTime <= 0) { + fastFail(INIT, null); + beginNext(executorService, now, remainTime); + return; + } + //如果自己已经执行过了。 + //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 + if (getState() == FINISH || getState() == ERROR) { + beginNext(executorService, now, remainTime); + return; + } + + //如果在执行前需要校验nextWrapper的状态 + if (needCheckNextWrapperResult) { + //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了 + if (!checkNextWrapperResult()) { + fastFail(INIT, new SkippedException()); + beginNext(executorService, now, remainTime); + return; + } + } + + //如果没有任何依赖,说明自己就是第一批要执行的 + if (dependWrappers == null || dependWrappers.size() == 0) { + fire(); + beginNext(executorService, now, remainTime); + return; + } + + /*如果有前方依赖,存在两种情况 + 一种是前面只有一个wrapper。即 A -> B + 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。 + 所以需要B来做判断,必须A、C、D都完成,自己才能执行 */ + + //只有一个依赖 + if (dependWrappers.size() == 1) { + doDependsOneJob(fromWrapper); + beginNext(executorService, now, remainTime); + } else { + //有多个依赖时 + doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime); + } + + } + + + public void work(ExecutorService executorService, long remainTime, Map forParamUseWrappers) { + work(executorService, null, remainTime, forParamUseWrappers); + } + + /** + * 总控制台超时,停止所有任务 + */ + public void stopNow() { + if (getState() == INIT || getState() == WORKING) { + fastFail(getState(), null); + } + } + + /** + * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的 + * 如果没有返回true,如果有返回false + */ + private boolean checkNextWrapperResult() { + //如果自己就是最后一个,或者后面有并行的多个,就返回true + if (nextWrappers == null || nextWrappers.size() != 1) { + return getState() == INIT; + } + WorkerWrapper nextWrapper = nextWrappers.get(0); + boolean state = nextWrapper.getState() == INIT; + //继续校验自己的next的状态 + return state && nextWrapper.checkNextWrapperResult(); + } + + /** + * 进行下一个任务 + */ + private void beginNext(ExecutorService executorService, long now, long remainTime) { + //花费的时间 + long costTime = SystemClock.now() - now; + if (nextWrappers == null) { + return; + } + if (nextWrappers.size() == 1) { + nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers); + return; + } + CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()]; + for (int i = 0; i < nextWrappers.size(); i++) { + int finalI = i; + futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI) + .work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService); + } + try { + CompletableFuture.allOf(futures).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + + private void doDependsOneJob(WorkerWrapper dependWrapper) { + if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) { + workResult = defaultResult(); + fastFail(INIT, null); + } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) { + workResult = defaultExResult(dependWrapper.getWorkResult().getEx()); + fastFail(INIT, null); + } else { + //前面任务正常完毕了,该自己了 + fire(); + } + } + + private synchronized void doDependsJobs(ExecutorService executorService, List dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) { + boolean nowDependIsMust = false; + //创建必须完成的上游wrapper集合 + Set mustWrapper = new HashSet<>(); + for (DependWrapper dependWrapper : dependWrappers) { + if (dependWrapper.isMust()) { + mustWrapper.add(dependWrapper); + } + if (dependWrapper.getDependWrapper().equals(fromWrapper)) { + nowDependIsMust = dependWrapper.isMust(); + } + } + + //如果全部是不必须的条件,那么只要到了这里,就执行自己。 + if (mustWrapper.size() == 0) { + if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) { + fastFail(INIT, null); + } else { + fire(); + } + beginNext(executorService, now, remainTime); + return; + } + + //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干 + if (!nowDependIsMust) { + return; + } + + //如果fromWrapper是必须的 + boolean existNoFinish = false; + boolean hasError = false; + //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了 + for (DependWrapper dependWrapper : mustWrapper) { + WorkerWrapper workerWrapper = dependWrapper.getDependWrapper(); + WorkResult tempWorkResult = workerWrapper.getWorkResult(); + //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完 + if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) { + existNoFinish = true; + break; + } + if (ResultState.TIMEOUT == tempWorkResult.getResultState()) { + workResult = defaultResult(); + hasError = true; + break; + } + if (ResultState.EXCEPTION == tempWorkResult.getResultState()) { + workResult = defaultExResult(workerWrapper.getWorkResult().getEx()); + hasError = true; + break; + } + + } + //只要有失败的 + if (hasError) { + fastFail(INIT, null); + beginNext(executorService, now, remainTime); + return; + } + + //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working + //都finish的话 + if (!existNoFinish) { + //上游都finish了,进行自己 + fire(); + beginNext(executorService, now, remainTime); + return; + } + } + + /** + * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程 + */ + private void fire() { + //阻塞取结果 + workResult = workerDoJob(); + } + + /** + * 快速失败 + */ + private boolean fastFail(int expect, Exception e) { + //试图将它从expect状态,改成Error + if (!compareAndSetState(expect, ERROR)) { + return false; + } + + //尚未处理过结果 + if (checkIsNullResult()) { + if (e == null) { + workResult = defaultResult(); + } else { + workResult = defaultExResult(e); + } + } + + callback.result(false, param, workResult); + return true; + } + + /** + * 具体的单个worker执行任务 + */ + private WorkResult workerDoJob() { + //避免重复执行 + if (!checkIsNullResult()) { + return workResult; + } + try { + //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行 + if (!compareAndSetState(INIT, WORKING)) { + return workResult; + } + + callback.begin(); + + //执行耗时操作 + V resultValue = worker.action(param, forParamUseWrappers); + + //如果状态不是在working,说明别的地方已经修改了 + if (!compareAndSetState(WORKING, FINISH)) { + return workResult; + } + + workResult.setResultState(ResultState.SUCCESS); + workResult.setResult(resultValue); + //回调成功 + callback.result(true, param, workResult); + + return workResult; + } catch (Exception e) { + //避免重复回调 + if (!checkIsNullResult()) { + return workResult; + } + fastFail(WORKING, e); + return workResult; + } + } + + public WorkResult getWorkResult() { + return workResult; + } + + public List> getNextWrappers() { + return nextWrappers; + } + + public void setParam(T param) { + this.param = param; + } + + private boolean checkIsNullResult() { + return ResultState.DEFAULT == workResult.getResultState(); + } + + private void addDepend(WorkerWrapper workerWrapper, boolean must) { + addDepend(new DependWrapper(workerWrapper, must)); + } + + private void addDepend(DependWrapper dependWrapper) { + if (dependWrappers == null) { + dependWrappers = new ArrayList<>(); + } + //如果依赖的是重复的同一个,就不重复添加了 + for (DependWrapper wrapper : dependWrappers) { + if (wrapper.equals(dependWrapper)) { + return; + } + } + dependWrappers.add(dependWrapper); + } + + private void addNext(WorkerWrapper workerWrapper) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + //避免添加重复 + for (WorkerWrapper wrapper : nextWrappers) { + if (workerWrapper.equals(wrapper)) { + return; + } + } + nextWrappers.add(workerWrapper); + } + + private void addNextWrappers(List> wrappers) { + if (wrappers == null) { + return; + } + for (WorkerWrapper wrapper : wrappers) { + addNext(wrapper); + } + } + + private void addDependWrappers(List dependWrappers) { + if (dependWrappers == null) { + return; + } + for (DependWrapper wrapper : dependWrappers) { + addDepend(wrapper); + } + } + + private WorkResult defaultResult() { + workResult.setResultState(ResultState.TIMEOUT); + workResult.setResult(worker.defaultValue()); + return workResult; + } + + private WorkResult defaultExResult(Exception ex) { + workResult.setResultState(ResultState.EXCEPTION); + workResult.setResult(worker.defaultValue()); + workResult.setEx(ex); + return workResult; + } + + + private int getState() { + return state.get(); + } + + public String getId() { + return id; + } + + private boolean compareAndSetState(int expect, int update) { + return this.state.compareAndSet(expect, update); + } + + private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + this.needCheckNextWrapperResult = needCheckNextWrapperResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerWrapper that = (WorkerWrapper) o; + return needCheckNextWrapperResult == that.needCheckNextWrapperResult && + Objects.equals(param, that.param) && + Objects.equals(worker, that.worker) && + Objects.equals(callback, that.callback) && + Objects.equals(nextWrappers, that.nextWrappers) && + Objects.equals(dependWrappers, that.dependWrappers) && + Objects.equals(state, that.state) && + Objects.equals(workResult, that.workResult); + } + + @Override + public int hashCode() { + return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult); + } + + public static class Builder { + /** + * 该wrapper的唯一标识 + */ + private String id = UUID.randomUUID().toString(); + /** + * worker将来要处理的param + */ + private W param; + private IWorker worker; + private ICallback callback; + /** + * 自己后面的所有 + */ + private List> nextWrappers; + /** + * 自己依赖的所有 + */ + private List dependWrappers; + /** + * 存储强依赖于自己的wrapper集合 + */ + private Set> selfIsMustSet; + + private boolean needCheckNextWrapperResult = true; + + public Builder worker(IWorker worker) { + this.worker = worker; + return this; + } + + public Builder param(W w) { + this.param = w; + return this; + } + + public Builder id(String id) { + if (id != null) { + this.id = id; + } + return this; + } + + public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + this.needCheckNextWrapperResult = needCheckNextWrapperResult; + return this; + } + + public Builder callback(ICallback callback) { + this.callback = callback; + return this; + } + + public Builder depend(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + depend(wrapper); + } + return this; + } + + public Builder depend(WorkerWrapper wrapper) { + return depend(wrapper, true); + } + + public Builder depend(WorkerWrapper wrapper, boolean isMust) { + if (wrapper == null) { + return this; + } + DependWrapper dependWrapper = new DependWrapper(wrapper, isMust); + if (dependWrappers == null) { + dependWrappers = new ArrayList<>(); + } + dependWrappers.add(dependWrapper); + return this; + } + + public Builder next(WorkerWrapper wrapper) { + return next(wrapper, true); + } + + public Builder next(WorkerWrapper wrapper, boolean selfIsMust) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + nextWrappers.add(wrapper); + + //强依赖自己 + if (selfIsMust) { + if (selfIsMustSet == null) { + selfIsMustSet = new HashSet<>(); + } + selfIsMustSet.add(wrapper); + } + return this; + } + + public Builder next(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + next(wrapper); + } + return this; + } + + public WorkerWrapper build() { + WorkerWrapper wrapper = new WorkerWrapper<>(id, worker, param, callback); + wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult); + if (dependWrappers != null) { + for (DependWrapper workerWrapper : dependWrappers) { + workerWrapper.getDependWrapper().addNext(wrapper); + wrapper.addDepend(workerWrapper); + } + } + if (nextWrappers != null) { + for (WorkerWrapper workerWrapper : nextWrappers) { + boolean must = false; + if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) { + must = true; + } + workerWrapper.addDepend(wrapper, must); + wrapper.addNext(workerWrapper); + } + } + + return wrapper; + } + + } +} diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index dd00d8cfb..62df36aad 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -13,6 +13,11 @@ + + com.yomahub + liteflow-async-tool + ${project.version} + com.yomahub liteflow-script-common diff --git a/pom.xml b/pom.xml index a199bc53a..7645c0c43 100644 --- a/pom.xml +++ b/pom.xml @@ -39,8 +39,8 @@ - UTF-8 - 1.8 + 8 + 8 2.0.5.RELEASE 5.0.9.RELEASE 1.7.21 @@ -175,16 +175,6 @@ - - org.apache.maven.plugins - maven-compiler-plugin - 3.0 - - UTF-8 - ${java.version} - ${java.version} - - org.apache.maven.plugins maven-surefire-plugin @@ -256,7 +246,8 @@ liteflow-testcase-springnative liteflow-testcase-script-qlexpress liteflow-testcase-script-groovy - + liteflow-async-tool +