From 617440db259f364268e22b56bfe3376c2f4cca4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E4=BD=B3?= Date: Fri, 26 Mar 2021 21:38:13 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I37QVR=20WhenCondition=E6=97=B6?= =?UTF-8?q?=E5=80=99=EF=BC=8C=E5=B9=B6=E5=8F=91=E6=89=A7=E8=A1=8C=E7=9B=AE?= =?UTF-8?q?=E5=89=8D=E4=BC=9A=E6=AF=8F=E6=AC=A1=E6=96=B0=E5=BB=BA=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=8F=AF=E4=B8=8D=E5=8F=AF=E8=B5=B0=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/flow/Chain.java | 40 ++++++++++++++++--- ...elCondition.java => ParallelCallable.java} | 15 ++++--- .../liteflow/entity/flow/WhenCondition.java | 12 ++++++ .../exception/WhenExecuteException.java | 21 ++++++++++ .../liteflow/parser/XmlFlowParser.java | 8 +++- .../yomahub/liteflow/util/ExecutorHelper.java | 2 +- .../com/yomahub/liteflow/util}/Shutdown.java | 16 ++------ .../LiteflowExecutorAutoConfiguration.java | 8 +++- .../src/main/resources/applicationContext.xml | 2 + .../flowtest/concurrent/TestParseFlow.java | 38 ++++++++++-------- .../flowtest/concurrent/TestRunFlow.java | 19 +++++++-- .../mock/component/p/P5Component.java | 4 +- .../src/test/resources/config/flow-test.xml | 17 +++++--- 13 files changed, 148 insertions(+), 54 deletions(-) rename liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/{ParallelCondition.java => ParallelCallable.java} (69%) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java rename {liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot => liteflow-core/src/main/java/com/yomahub/liteflow/util}/Shutdown.java (62%) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index f6433dc7c..c1e3bf76b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -13,6 +13,7 @@ import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.enums.ExecuteTypeEnum; import com.yomahub.liteflow.exception.FlowSystemException; +import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.apache.commons.collections4.CollectionUtils; @@ -110,7 +111,7 @@ public class Chain implements Executable { // 使用线程池执行when并发流程 private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { - ExecutorService parallelExecutor = SpringAware.getBean(ExecutorService.class); + ExecutorService parallelExecutor = SpringAware.getBean("whenExecutors"); final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); final List> futures = new ArrayList<>(condition.getNodeList().size()); @@ -118,21 +119,48 @@ public class Chain implements Executable { for (int i = 0; i < condition.getNodeList().size(); i++) { futures.add(parallelExecutor.submit( - new ParallelCondition(condition.getNodeList().get(i), slotIndex, requestId, latch) + new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch) )); } + boolean interrupted = false; try { if (!latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS)) { for (Future f : futures) { f.cancel(true); } - LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled and move to next executableItem." - , requestId); + interrupted = true; + LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId); } } catch (InterruptedException e) { - // ignore InterruptedException - + interrupted = true; } + + /** + * 当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + */ + if (!condition.isErrorResume()) { + if (interrupted) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute interrupted. errorResume [false].", requestId)); + } + + for (Future f : futures) { + try { + if (!f.get()) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } catch (InterruptedException | ExecutionException e) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } + + } else if (interrupted) { + // 这里由于配置了errorResume,所以只打印warn日志 + LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); + } + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java similarity index 69% rename from liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java rename to liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java index cbfe0e1e3..97b74e133 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java @@ -10,9 +10,9 @@ import java.util.concurrent.CountDownLatch; * 并行器线程 * @author Bryan.Zhang */ -public class ParallelCondition implements Callable { +public class ParallelCallable implements Callable { - private static final Logger LOG = LoggerFactory.getLogger(ParallelCondition.class); + private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class); private Executable executableItem; @@ -20,9 +20,10 @@ public class ParallelCondition implements Callable { private String requestId; + private CountDownLatch latch; - public ParallelCondition(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { + public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { this.executableItem = executableItem; this.slotIndex = slotIndex; this.requestId = requestId; @@ -33,12 +34,14 @@ public class ParallelCondition implements Callable { public Boolean call() throws Exception { try { executableItem.execute(slotIndex); + + return true; }catch(Exception e){ - LOG.error("requestId [{}], item [{}] execute cause error", requestId, executableItem.getExecuteName(), e); + LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName()); + + return false; } finally { latch.countDown(); } - - return true; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java index b2ccd929c..13a202537 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java @@ -14,8 +14,20 @@ import java.util.List; * @author Bryan.Zhang */ public class WhenCondition extends Condition{ + // 增加errorResume属性,以区分当when调用链调用失败时是否继续往下执行 + private boolean errorResume; public WhenCondition(List nodeList) { super(nodeList); + errorResume = true; + } + + public WhenCondition(List nodeList, boolean errorResume) { + super(nodeList); + this.errorResume = errorResume; + } + + public boolean isErrorResume() { + return errorResume; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java new file mode 100644 index 000000000..ebe3f8662 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.exception; + + +public class WhenExecuteException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** 异常信息 */ + private String message; + + public WhenExecuteException(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index 372524838..fba1f8570 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -13,6 +13,7 @@ import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; import com.yomahub.liteflow.exception.ParseException; import com.yomahub.liteflow.util.SpringAware; import org.apache.commons.lang3.StringUtils; +import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; @@ -139,7 +140,12 @@ public abstract class XmlFlowParser { if (condE.getName().equals("then")) { conditionList.add(new ThenCondition(chainNodeList)); } else if (condE.getName().equals("when")) { - conditionList.add(new WhenCondition(chainNodeList)); + Attribute errorResume = condE.attribute("errorResume"); + if (errorResume != null) { + conditionList.add(new WhenCondition(chainNodeList, errorResume.getValue().equals("true"))); + } else { + conditionList.add(new WhenCondition(chainNodeList)); + } } } FlowBus.addChain(chainName, new Chain(chainName,conditionList)); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java index a381ec7cb..e093869a7 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java @@ -13,7 +13,7 @@ import java.util.concurrent.atomic.AtomicLong; /** * 线程池工具类 - * @author justin.xu + * @author Bryan.Zhang */ public class ExecutorHelper { diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java similarity index 62% rename from liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java rename to liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java index 9450ed6c4..5076944c0 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java @@ -1,34 +1,26 @@ -package com.yomahub.liteflow.springboot; +package com.yomahub.liteflow.util; -import com.yomahub.liteflow.util.ExecutorHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.core.annotation.Order; -import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; -import javax.annotation.Resource; import java.util.concurrent.ExecutorService; /** * 关闭shutdown类 * 执行清理工作 - * @author justin.xu + * @author Bryan.Zhang */ -@Order(Integer.MIN_VALUE) -@Component public class Shutdown { private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class); - @Resource - private ExecutorService executorService; - @PreDestroy public void destroy() throws Exception { + ExecutorService executorService = SpringAware.getBean("whenExecutors"); + LOG.info("Start closing the liteflow-when-calls..."); ExecutorHelper.shutdownAwaitTermination(executorService); LOG.info("Succeed closing the liteflow-when-calls ok..."); } - } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java index 6f6785e3b..ee2d0deaf 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.springboot; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.ExecutorHelper; +import com.yomahub.liteflow.util.Shutdown; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; @@ -19,7 +20,7 @@ import java.util.concurrent.ExecutorService; @AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class}) public class LiteflowExecutorAutoConfiguration { - @Bean + @Bean("whenExecutors") public ExecutorService executorService(LiteflowConfig liteflowConfig) { int useWorker = liteflowConfig.getWhenMaxWorkers(); int useQueue = liteflowConfig.getWhenQueueLimit(); @@ -33,4 +34,9 @@ public class LiteflowExecutorAutoConfiguration { return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-calls", false); } + + @Bean + public Shutdown shutdown() { + return new Shutdown(); + } } diff --git a/liteflow-test-spring/src/main/resources/applicationContext.xml b/liteflow-test-spring/src/main/resources/applicationContext.xml index 57c18aa26..03744237b 100644 --- a/liteflow-test-spring/src/main/resources/applicationContext.xml +++ b/liteflow-test-spring/src/main/resources/applicationContext.xml @@ -37,4 +37,6 @@ + + diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java index 00ba958a3..ff647fbfb 100644 --- a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java @@ -28,22 +28,25 @@ import java.util.List; @SpringBootTest public class TestParseFlow { - private Check caseAsync = new Check("async", Arrays.asList( - ThenCondition.class, - WhenCondition.class, - WhenCondition.class, - WhenCondition.class + private Check caseErrorResume = new Check("test-errorResume", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) )); - private Check caseConcurrent = new Check("async-concurrent1", Collections.singletonList( - WhenCondition.class + private Check caseErrorBreak = new Check("test-errorBreak", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, false), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) )); @Test public void parseWhen() throws Exception { - assertTrue(caseAsync, FlowBus.getChain(caseAsync.getChainCode())); + assertTrue(caseErrorResume, FlowBus.getChain(caseErrorResume.getChainCode())); - assertTrue(caseConcurrent, FlowBus.getChain(caseConcurrent.getChainCode())); + assertTrue(caseErrorBreak, FlowBus.getChain(caseErrorBreak.getChainCode())); } private void assertTrue(Check check, Chain chain) { @@ -52,28 +55,31 @@ public class TestParseFlow { Assert.assertTrue(null != chain.getConditionList() && !chain.getConditionList().isEmpty()); for (int i = 0; i < chain.getConditionList().size(); i ++) { - Class expected = check.getConditionClazz().get(i); + AbstractMap.SimpleEntry, Boolean> expected = check.getClazzWithFlags().get(i); Condition actual = chain.getConditionList().get(i); - Assert.assertEquals(expected, actual.getClass()); + Assert.assertEquals(expected.getKey(), actual.getClass()); + if (actual.getClass().equals(WhenCondition.class)) { + Assert.assertEquals(expected.getValue(), ((WhenCondition) actual).isErrorResume()); + } } } public static class Check { private String chainCode; - private List> conditionClazz; + private List, Boolean>> clazzWithFlags; - public Check(String chainCode, List> conditionClazz) { + public Check(String chainCode, List, Boolean>> clazzWithFlags) { this.chainCode = chainCode; - this.conditionClazz = conditionClazz; + this.clazzWithFlags = clazzWithFlags; } public String getChainCode() { return chainCode; } - public List> getConditionClazz() { - return conditionClazz; + public List, Boolean>> getClazzWithFlags() { + return clazzWithFlags; } } } diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java index bcb90b578..fad9ca62c 100644 --- a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java @@ -38,16 +38,29 @@ public class TestRunFlow { } @Test - public void mixedRunTest() throws Exception { - String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p5", "p6", "p7", "p8")); + public void mixedRunByErrorResumeTest() throws Exception { + //由于errorResume,即使p5执行失败抛出异常, p7, p8也将会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6", "p7", "p8")); - flowExecutor.execute("async", requestId); + flowExecutor.execute("test-errorResume", requestId); + + caseAssertRandom(requestId); + } + + + @Test + public void mixedRunByErrorBreakTest() throws Exception { + //由于errorBreak,p5执行失败抛出异常, p7, p8将不会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6")); + + flowExecutor.execute("test-errorBreak", requestId); caseAssertRandom(requestId); } @Test public void parallelTest() throws InterruptedException { + //测试2个线程并发时,所执行的序列是正常的,线程安全的(slotIndex在每个执行序列chain中都是不变的) String requestId1 = init(Arrays.asList("c1", "c2", "c3", "c4", "c5")); String requestId2 = init(Arrays.asList("c6", "c7", "c8", "c9", "c10")); diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java index c222c5876..c4463fc26 100644 --- a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java @@ -1,6 +1,5 @@ package com.yomahub.flowtest.concurrent.mock.component.p; -import com.yomahub.flowtest.concurrent.ConcurrentCase; import com.yomahub.liteflow.core.NodeComponent; import org.springframework.stereotype.Component; @@ -15,7 +14,6 @@ public class P5Component extends NodeComponent { @Override public void process() throws Exception { - ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); - System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + throw new RuntimeException(String.format("test mock error [%s]", name)); } } \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/resources/config/flow-test.xml b/liteflow-test-springboot/src/test/resources/config/flow-test.xml index a2e66e396..7bdc30870 100644 --- a/liteflow-test-springboot/src/test/resources/config/flow-test.xml +++ b/liteflow-test-springboot/src/test/resources/config/flow-test.xml @@ -1,12 +1,19 @@ - - + + - - - + + + + + + + + + +