From fe0aedc8e2a244a9c0ec7485036cfa195e4c0616 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Mon, 8 Nov 2021 19:37:56 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I4HECS=20=E9=87=8D=E8=AF=95?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E7=9A=84=E4=BB=A3=E7=A0=81=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/flow/Chain.java | 29 ++----------------- .../yomahub/liteflow/entity/flow/Node.java | 29 +++++++++++++++++-- .../entity/flow/ParallelCallable.java | 27 ++++------------- 3 files changed, 34 insertions(+), 51 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index bc9c6d403..720b337a8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -78,32 +78,7 @@ public class Chain implements Executable { for (Condition condition : conditionList) { if (condition instanceof ThenCondition) { for (Executable executableItem : condition.getNodeList()) { - if (executableItem.getExecuteType().equals(ExecuteTypeEnum.CHAIN)) { - executableItem.execute(slotIndex); - } else { - int retryCount = ((Node)executableItem).getInstance().getRetryCount(); - List> forExceptions = Arrays.asList(((Node)executableItem).getInstance().getRetryForExceptions()); - for (int i = 0; i <= retryCount; i++) { - try { - if (i > 0) { - LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), executableItem.getExecuteName(), i + 1); - } - executableItem.execute(slotIndex); - break; - } catch (ChainEndException e) { - //如果是ChainEndException,则无需重试 - throw e; - } catch (Exception e) { - //判断抛出的异常是不是指定异常的子类 - boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass())); - - //两种情况不重试,1)抛出异常不在指定异常范围内 2)已经重试次数大于等于配置次数 - if (!flag || i >= retryCount){ - throw e; - } - } - } - } + executableItem.execute(slotIndex); } } else if (condition instanceof WhenCondition) { executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId()); @@ -134,7 +109,7 @@ public class Chain implements Executable { condition.getNodeList().forEach(executable -> { Future future = parallelExecutor.submit( - Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch, liteflowConfig.getRetryCount()))) + Objects.requireNonNull(TtlCallable.get(new ParallelCallable(executable, slotIndex, requestId, latch))) ); futureMap.put(executable.getExecuteName(), future); }); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java index 83e5c6667..9fe0bf51c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java @@ -8,7 +8,9 @@ package com.yomahub.liteflow.entity.flow; import java.text.MessageFormat; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import cn.hutool.core.util.ObjectUtil; @@ -114,8 +116,31 @@ public class Node implements Executable,Cloneable{ instance.setTag(tag); instance.setCondNodeMap(condNodeMap); - //执行业务逻辑的主要入口 - instance.execute(); + //这里开始进行重试的逻辑和主逻辑的运行 + int retryCount = instance.getRetryCount(); + List> forExceptions = Arrays.asList(instance.getRetryForExceptions()); + for (int i = 0; i <= retryCount; i++) { + try { + if (i > 0) { + LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), id, i + 1); + } + //执行业务逻辑的主要入口 + instance.execute(); + break; + } catch (ChainEndException e) { + //如果是ChainEndException,则无需重试 + throw e; + } catch (Exception e) { + //判断抛出的异常是不是指定异常的子类 + boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass())); + + //两种情况不重试,1)抛出异常不在指定异常范围内 2)已经重试次数大于等于配置次数 + if (!flag || i >= retryCount){ + throw e; + } + } + } + //如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束 if (instance.isEnd()) { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java index ed4aaee1f..82ee1c3f5 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java @@ -22,37 +22,20 @@ public class ParallelCallable implements Callable { private final CountDownLatch latch; - private final int retryCount; - - public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch, int retryCount) { + public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { this.executableItem = executableItem; this.slotIndex = slotIndex; this.requestId = requestId; this.latch = latch; - this.retryCount = retryCount; } @Override public Boolean call() throws Exception { try { - boolean flag = true; - for (int i = 0; i <= retryCount; i++) { - try{ - if (i > 0){ - LOG.info("[{}]:component[{}] performs {} retry", requestId, executableItem.getExecuteName(), i+1); - } - executableItem.execute(slotIndex); - flag = true; - break; - }catch (Exception e){ - if (i >= retryCount){ - LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName()); - flag = false; - break; - } - } - } - return flag; + executableItem.execute(slotIndex); + return true; + } catch (Exception e){ + return false; } finally { latch.countDown(); }