From 5e0aef934974f0d3b9ea2a60ead7b6e6a08beff1 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Tue, 4 Jan 2022 20:58:34 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I4OTK4=20=E5=B8=8C=E6=9C=9Bfinal?= =?UTF-8?q?ly=E7=BB=84=E4=BB=B6=E5=8F=AF=E4=BB=A5=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E5=88=B0then=E7=BB=84=E4=BB=B6=E7=9A=84=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=AF=B9=E8=B1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/core/FlowExecutor.java | 10 +++- .../yomahub/liteflow/entity/flow/Chain.java | 57 +++++++++---------- .../flow/parallel/ParallelSupplier.java | 5 +- .../PreAndFinallySpringbootTest.java | 8 +++ .../test/preAndFinally/cmp/Finally3Cmp.java | 28 +++++++++ .../src/test/resources/preAndFinally/flow.xml | 5 ++ 6 files changed, 79 insertions(+), 34 deletions(-) create mode 100644 liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/cmp/Finally3Cmp.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 6966bef5a..67449e997 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -360,7 +360,7 @@ public class FlowExecutor { chain = FlowBus.getChain(chainId); if (ObjectUtil.isNull(chain)) { - String errorMsg = StrUtil.format("couldn't find chain with the id[{}]", chainId); + String errorMsg = StrUtil.format("[{}]:couldn't find chain with the id[{}]", slot.getRequestId(), chainId); throw new ChainNotFoundException(errorMsg); } @@ -372,6 +372,14 @@ public class FlowExecutor { } slot.setException(e); } finally { + try{ + if (ObjectUtil.isNotNull(chain)){ + chain.executeFinally(slotIndex); + } + }catch (Exception e){ + LOG.error("[{}]:an exception occurred during the finally Component execution in chain[{}]", slot.getRequestId(), chain.getChainName()); + } + if (!isInnerChain) { slot.printStep(); DataBus.releaseSlot(slotIndex); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 4568bd40c..773671389 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -70,33 +70,30 @@ public class Chain implements Executable { throw new FlowSystemException("no conditionList in this chain[" + chainName + "]"); } - Slot slot = DataBus.getSlot(slotIndex); + //循环chain里包含的condition,每一个condition分四种类型:pre,then,when,finally + //这里conditionList其实已经是有序的,pre一定在最前面,finally一定在最后面 + for (Condition condition : conditionList) { + if (condition instanceof PreCondition){ + for (Executable executableItem : condition.getNodeList()) { + executableItem.execute(slotIndex); + } + } else if (condition instanceof ThenCondition) { + for (Executable executableItem : condition.getNodeList()) { + executableItem.execute(slotIndex); + } + } else if (condition instanceof WhenCondition) { + executeAsyncCondition((WhenCondition) condition, slotIndex); + } + } + } + public void executeFinally(Integer slotIndex) throws Exception { //先把finally的节点过滤出来 List finallyConditionList = conditionList.stream().filter(condition -> condition.getConditionType().equals(ConditionTypeEnum.TYPE_FINALLY.getType())).collect(Collectors.toList()); - - //循环chain里包含的condition,每一个condition分四种类型:pre,then,when,finally - //这里conditionList其实已经是有序的,pre一定在最前面,finally一定在最后面 - try{ - for (Condition condition : conditionList) { - if (condition instanceof PreCondition){ - for (Executable executableItem : condition.getNodeList()) { - executableItem.execute(slotIndex); - } - } else if (condition instanceof ThenCondition) { - for (Executable executableItem : condition.getNodeList()) { - executableItem.execute(slotIndex); - } - } else if (condition instanceof WhenCondition) { - executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId()); - } - } - }finally { - for (Condition finallyCondition : finallyConditionList){ - for(Executable executableItem : finallyCondition.getNodeList()){ - executableItem.execute(slotIndex); - } + for (Condition finallyCondition : finallyConditionList){ + for(Executable executableItem : finallyCondition.getNodeList()){ + executableItem.execute(slotIndex); } } } @@ -113,7 +110,9 @@ public class Chain implements Executable { //使用线程池执行when并发流程 //这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读 - private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{ + private void executeAsyncCondition(WhenCondition condition, Integer slotIndex) throws Exception{ + Slot slot = DataBus.getSlot(slotIndex); + //此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的 ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor(); @@ -131,7 +130,7 @@ public class Chain implements Executable { List> completableFutureList = condition.getNodeList().stream().map( executable -> CompletableFutureTimeout.completeOnTimeout( WhenFutureObj.timeOut(executable.getExecuteName()), - CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex, requestId), parallelExecutor), + CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex), parallelExecutor), liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS ) @@ -186,24 +185,24 @@ public class Chain implements Executable { //输出超时信息 timeOutWhenFutureObjList.forEach(whenFutureObj -> - LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, whenFutureObj.getExecutorName())); + LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", slot.getRequestId(), whenFutureObj.getExecutorName())); //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException if (!condition.isErrorResume()) { if (interrupted[0]) { - throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())); } //循环判断CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常 for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){ if (!whenFutureObj.isSuccess()){ - LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), requestId)); + LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), slot.getRequestId())); throw whenFutureObj.getEx(); } } } else if (interrupted[0]) { // 这里由于配置了errorResume,所以只打印warn日志 - LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); + LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", slot.getRequestId()); } } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java index 74bca2cff..43a6c640d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/parallel/ParallelSupplier.java @@ -18,12 +18,9 @@ public class ParallelSupplier implements Supplier { private final Integer slotIndex; - private final String requestId; - - public ParallelSupplier(Executable executableItem, Integer slotIndex, String requestId) { + public ParallelSupplier(Executable executableItem, Integer slotIndex) { this.executableItem = executableItem; this.slotIndex = slotIndex; - this.requestId = requestId; } @Override diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/PreAndFinallySpringbootTest.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/PreAndFinallySpringbootTest.java index c91c1eb13..d992c50b6 100644 --- a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/PreAndFinallySpringbootTest.java +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/PreAndFinallySpringbootTest.java @@ -53,4 +53,12 @@ public class PreAndFinallySpringbootTest extends BaseTest { Assert.assertFalse(response.isSuccess()); Assert.assertEquals("p1==>p2==>a==>d==>f1==>f2", response.getSlot().printStep()); } + + //测试在finally节点里是否能获取exception + @Test + public void testPreAndFinally4() throws Exception{ + LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg"); + Assert.assertFalse(response.isSuccess()); + Assert.assertTrue(response.getSlot().getData("hasEx")); + } } diff --git a/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/cmp/Finally3Cmp.java b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/cmp/Finally3Cmp.java new file mode 100644 index 000000000..97f9a0415 --- /dev/null +++ b/liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/preAndFinally/cmp/Finally3Cmp.java @@ -0,0 +1,28 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.preAndFinally.cmp; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.entity.data.Slot; +import org.springframework.stereotype.Component; + +@Component("f3") +public class Finally3Cmp extends NodeComponent { + + @Override + public void process() throws Exception{ + Slot slot = this.getSlot(); + if (ObjectUtil.isNull(slot.getException())){ + slot.setData("hasEx", false); + }else{ + slot.setData("hasEx", true); + } + System.out.println("Finally3Cmp executed!"); + } +} diff --git a/liteflow-testcase-springboot/src/test/resources/preAndFinally/flow.xml b/liteflow-testcase-springboot/src/test/resources/preAndFinally/flow.xml index eaf33f46c..5dac9fb0a 100644 --- a/liteflow-testcase-springboot/src/test/resources/preAndFinally/flow.xml +++ b/liteflow-testcase-springboot/src/test/resources/preAndFinally/flow.xml @@ -18,4 +18,9 @@ + + + + + \ No newline at end of file