From 868d7ef20e8fbc942eb9a05b047dfb600f507f2f Mon Sep 17 00:00:00 2001 From: bryan31 Date: Fri, 2 Jul 2021 19:57:12 +0800 Subject: [PATCH] =?UTF-8?q?bug=20#I3YX3Z=20=E5=BD=93=E6=8A=9B=E5=87=BA?= =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=97=B6=EF=BC=8CLiteflowResponse=E4=B8=AD?= =?UTF-8?q?=E7=9A=84Slot=E4=B8=BAnull?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- liteflow-core/pom.xml | 2 +- .../yomahub/liteflow/core/FlowExecutor.java | 153 ++++++++++-------- .../yomahub/liteflow/entity/data/AbsSlot.java | 12 ++ .../yomahub/liteflow/entity/data/Slot.java | 44 ++--- .../yomahub/liteflow/entity/flow/Chain.java | 19 +-- liteflow-spring-boot-starter/pom.xml | 2 +- .../exception/FlowExecutorSpringBootTest.java | 10 ++ .../liteflow/test/exception/cmp1/DCmp.java | 27 ++++ .../test/flowmeta/FlowMetaSpringbootTest.java | 2 +- .../src/test/resources/exception/flow.xml | 4 + liteflow-test-spring/pom.xml | 2 +- liteflow-test-springboot/pom.xml | 2 +- pom.xml | 2 +- 13 files changed, 177 insertions(+), 104 deletions(-) create mode 100644 liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/cmp1/DCmp.java diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index 945e4672b..673456552 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -9,7 +9,7 @@ com.yomahub liteflow - 2.5.7 + 2.5.8 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index a0dccbe5c..fb2a38864 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -1,6 +1,7 @@ /** *

Title: liteflow

*

Description: 轻量级的组件式流程框架

+ * * @author Bryan.Zhang * @email weenyc31@163.com * @Date 2020/4/1 @@ -37,6 +38,7 @@ import java.util.List; /** * 流程规则主要执行器类 + * * @author Bryan.Zhang */ public class FlowExecutor { @@ -73,7 +75,7 @@ public class FlowExecutor { for (String path : rulePath) { try { FlowParserTypeEnum pattern = matchFormatConfig(path); - if(ObjectUtil.isNotNull(pattern)) { + if (ObjectUtil.isNotNull(pattern)) { path = ReUtil.replaceAll(path, PREFIX_FORMATE_CONFIG_REGEX, ""); switch (pattern) { case TYPE_XML: @@ -89,12 +91,12 @@ public class FlowExecutor { LOG.error("can't support the format {}", path); } } - if(ObjectUtil.isNotNull(parser)) { + if (ObjectUtil.isNotNull(parser)) { parser.parseMain(path); } else { throw new ConfigErrorException("parse error, please check liteflow config property"); } - } catch (Exception e) { + } catch (Exception e) { String errorMsg = MessageFormat.format("init flow executor cause error,can not parse rule file {0}", path); LOG.error(errorMsg, e); throw new FlowExecutorNotInitException(errorMsg); @@ -104,13 +106,14 @@ public class FlowExecutor { /** * 匹配路径配置,生成对应的解析器 - * @param path 配置路径 + * + * @param path 配置路径 * @param pattern 格式 * @return */ private FlowParser matchFormatParser(String path, FlowParserTypeEnum pattern) throws ClassNotFoundException, IllegalAccessException, InstantiationException { boolean isLocalFile = isLocalConfig(path); - if(isLocalFile) { + if (isLocalFile) { LOG.info("flow info loaded from local file,path={},format type={}", path, pattern.getType()); switch (pattern) { case TYPE_XML: @@ -121,7 +124,7 @@ public class FlowExecutor { return new LocalYmlFlowParser(); default: } - } else if(isClassConfig(path)){ + } else if (isClassConfig(path)) { LOG.info("flow info loaded from class config,class={},format type={}", path, pattern.getType()); Class c = Class.forName(path); switch (pattern) { @@ -133,7 +136,7 @@ public class FlowExecutor { return (YmlFlowParser) SpringAware.registerBean(c); default: } - } else if(isZKConfig(path)) { + } else if (isZKConfig(path)) { LOG.info("flow info loaded from Zookeeper,zkNode={},format type={}", path, pattern.getType()); switch (pattern) { case TYPE_XML: @@ -151,17 +154,19 @@ public class FlowExecutor { /** * 判定是否为本地文件 + * * @param path * @return */ private boolean isLocalConfig(String path) { return ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) - || ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) ; + || ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path); } /** * 判定是否为自定义class配置 + * * @param path * @return */ @@ -171,6 +176,7 @@ public class FlowExecutor { /** * 判定是否为zk配置 + * * @param path * @return */ @@ -180,24 +186,25 @@ public class FlowExecutor { /** * 匹配文本格式,支持xml,json和yml + * * @param path * @return */ private FlowParserTypeEnum matchFormatConfig(String path) { - if(ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_XML_CONFIG_REGEX, path)) { + if (ReUtil.isMatch(LOCAL_XML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_XML_CONFIG_REGEX, path)) { return FlowParserTypeEnum.TYPE_XML; - } else if(ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_JSON_CONFIG_REGEX, path)) { + } else if (ReUtil.isMatch(LOCAL_JSON_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_JSON_CONFIG_REGEX, path)) { return FlowParserTypeEnum.TYPE_JSON; - } else if(ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_YML_CONFIG_REGEX, path)) { + } else if (ReUtil.isMatch(LOCAL_YML_CONFIG_REGEX, path) || ReUtil.isMatch(FORMATE_YML_CONFIG_REGEX, path)) { return FlowParserTypeEnum.TYPE_YML; - } else if(isClassConfig(path)) { + } else if (isClassConfig(path)) { try { Class clazz = Class.forName(path); - if(ClassXmlFlowParser.class.isAssignableFrom(clazz)) { + if (ClassXmlFlowParser.class.isAssignableFrom(clazz)) { return FlowParserTypeEnum.TYPE_XML; - } else if(ClassJsonFlowParser.class.isAssignableFrom(clazz)) { + } else if (ClassJsonFlowParser.class.isAssignableFrom(clazz)) { return FlowParserTypeEnum.TYPE_JSON; - } else if(ClassYmlFlowParser.class.isAssignableFrom(clazz)) { + } else if (ClassYmlFlowParser.class.isAssignableFrom(clazz)) { return FlowParserTypeEnum.TYPE_YML; } } catch (ClassNotFoundException e) { @@ -210,9 +217,10 @@ public class FlowExecutor { public void reloadRule() { init(); } - + /** * callback by implicit subflow + * * @param chainId * @param param * @param slotClazz @@ -224,54 +232,54 @@ public class FlowExecutor { Integer slotIndex) throws Exception { this.execute(chainId, param, slotClazz, slotIndex, true); } - + public DefaultSlot execute(String chainId, Object param) throws Exception { return this.execute(chainId, param, DefaultSlot.class, null, false); } - + public T execute(String chainId, Object param, Class slotClazz) throws Exception { return this.execute(chainId, param, slotClazz, null, false); } - + public T execute(String chainId, Object param, Class slotClazz, - Integer slotIndex, boolean isInnerChain) throws Exception { - return this.doExecute(chainId, param, slotClazz, slotIndex, isInnerChain); + Integer slotIndex, boolean isInnerChain) throws Exception { + T slot = this.doExecute(chainId, param, slotClazz, slotIndex, isInnerChain); + if (ObjectUtil.isNotNull(slot.getException())) { + throw slot.getException(); + } else { + return slot; + } } - + public LiteflowResponse execute2Resp(String chainId, Object param) { return this.execute2Resp(chainId, param, DefaultSlot.class); } - public LiteflowResponse execute2Resp(String chainId, Object param, Class slotClazz) { + public LiteflowResponse execute2Resp(String chainId, Object param, Class slotClazz) { return this.execute2Resp(chainId, param, slotClazz, null, false); } - + public LiteflowResponse execute2Resp(String chainId, Object param, Class slotClazz, Integer slotIndex, - boolean isInnerChain) { + boolean isInnerChain) { LiteflowResponse response = new LiteflowResponse<>(); - try { - T slot = doExecute(chainId, param, slotClazz, slotIndex, isInnerChain); - response.setSlot(slot); - } catch (Exception ex) { + + T slot = doExecute(chainId, param, slotClazz, slotIndex, isInnerChain); + + if (ObjectUtil.isNotNull(slot.getException())) { response.setSuccess(false); - response.setMessage(ex.getMessage()); - response.setCause(ex.fillInStackTrace()); - LOG.error("chain execute exception", ex); + response.setMessage(slot.getException().getMessage()); + response.setCause(slot.getException()); + } else { + response.setSuccess(true); } + response.setSlot(slot); return response; } - + private T doExecute(String chainId, Object param, Class slotClazz, Integer slotIndex, - boolean isInnerChain) throws Exception { - if (FlowBus.needInit()) { - init(); - } - - Chain chain = FlowBus.getChain(chainId); - - if (ObjectUtil.isNull(chain)) { - String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId); - throw new ChainNotFoundException(errorMsg); + boolean isInnerChain) { + if (FlowBus.needInit()) { + init(); } if (!isInnerChain && ObjectUtil.isNull(slotIndex)) { @@ -283,36 +291,47 @@ public class FlowExecutor { throw new NoAvailableSlotException("there is no available slot"); } - T slot = DataBus.getSlot(slotIndex); - if (ObjectUtil.isNull(slot)) { - throw new NoAvailableSlotException("the slot is not exist"); - } + T slot = DataBus.getSlot(slotIndex); + if (ObjectUtil.isNull(slot)) { + throw new NoAvailableSlotException("the slot is not exist"); + } if (StrUtil.isBlank(slot.getRequestId())) { slot.generateRequestId(); LOG.info("requestId[{}] has generated", slot.getRequestId()); } - if (!isInnerChain) { - slot.setRequestData(param); - slot.setChainName(chainId); - } else { - slot.setChainReqData(chainId, param); - } - try { - // 执行chain - chain.execute(slotIndex); - } catch (Exception e) { - LOG.error("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex); - throw e; - } finally { - if (!isInnerChain) { - slot.printStep(); - DataBus.releaseSlot(slotIndex); - } - } - return slot; - } + if (!isInnerChain) { + slot.setRequestData(param); + slot.setChainName(chainId); + } else { + slot.setChainReqData(chainId, param); + } + + Chain chain = null; + try { + chain = FlowBus.getChain(chainId); + + if (ObjectUtil.isNull(chain)) { + String errorMsg = StrUtil.format("couldn't find chain with the id[{}]", chainId); + throw new ChainNotFoundException(errorMsg); + } + + // 执行chain + chain.execute(slotIndex); + } catch (Exception e) { + if (ObjectUtil.isNotNull(chain)){ + LOG.error("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex); + } + slot.setException(e); + } finally { + if (!isInnerChain) { + slot.printStep(); + DataBus.releaseSlot(slotIndex); + } + } + return slot; + } public String getZkNode() { return zkNode; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java index 299d83d36..824bdec99 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java @@ -40,6 +40,8 @@ public abstract class AbsSlot implements Slot { private static final String REQUEST_ID = "req_id"; + private static final String EXCEPTION = "exception"; + private Deque executeSteps = new ArrayDeque(); protected ConcurrentHashMap dataMap = new ConcurrentHashMap(); @@ -139,4 +141,14 @@ public abstract class AbsSlot implements Slot { public Deque getExecuteSteps() { return executeSteps; } + + @Override + public Exception getException() { + return (Exception) this.dataMap.get(EXCEPTION); + } + + @Override + public void setException(Exception e) { + this.dataMap.put(EXCEPTION, e); + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java index 9f91e770d..202f50b8d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java @@ -12,43 +12,47 @@ package com.yomahub.liteflow.entity.data; * @author Bryan.Zhang */ public interface Slot { - public T getInput(String nodeId); + T getInput(String nodeId); - public T getOutput(String nodeId); + T getOutput(String nodeId); - public void setInput(String nodeId,T t); + void setInput(String nodeId,T t); - public void setOutput(String nodeId,T t); + void setOutput(String nodeId,T t); - public T getRequestData(); + T getRequestData(); - public void setRequestData(T t); + void setRequestData(T t); - public T getResponseData(); + T getResponseData(); - public void setChainReqData(String chainId, T t); + void setChainReqData(String chainId, T t); - public T getChainReqData(String chainId); + T getChainReqData(String chainId); - public void setResponseData(T t); + void setResponseData(T t); - public T getData(String key); + T getData(String key); - public void setData(String key, T t); + void setData(String key, T t); - public void setCondResult(String key, T t); + void setCondResult(String key, T t); - public T getCondResult(String key); + T getCondResult(String key); - public void addStep(CmpStep step); + void addStep(CmpStep step); - public String printStep(); + String printStep(); - public void generateRequestId(); + void generateRequestId(); - public String getRequestId(); + String getRequestId(); - public void setChainName(String chainName); + void setChainName(String chainName); - public String getChainName(); + String getChainName(); + + void setException(Exception e); + + Exception getException(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index c61b4330b..59ddda939 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -10,6 +10,7 @@ package com.yomahub.liteflow.entity.flow; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.enums.ExecuteTypeEnum; @@ -143,31 +144,27 @@ public class Chain implements Executable { interrupted = true; } - /** - * 当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException - */ + //当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException if (!condition.isErrorResume()) { if (interrupted) { - throw new WhenExecuteException(String.format( - "requestId [%s] when execute interrupted. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format( + "requestId [{}] when execute interrupted. errorResume [false].", requestId)); } for (Future f : futures) { try { if (!f.get()) { - throw new WhenExecuteException(String.format( - "requestId [%s] when execute failed. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format( + "requestId [{}] when execute failed. errorResume [false].", requestId)); } } catch (InterruptedException | ExecutionException e) { - throw new WhenExecuteException(String.format( - "requestId [%s] when execute failed. errorResume [false].", requestId)); + throw new WhenExecuteException(StrUtil.format( + "requestId [{}] when execute failed. errorResume [false].", requestId)); } } - } else if (interrupted) { // 这里由于配置了errorResume,所以只打印warn日志 LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); } - } } diff --git a/liteflow-spring-boot-starter/pom.xml b/liteflow-spring-boot-starter/pom.xml index 3bf30b88d..b16f8e26f 100644 --- a/liteflow-spring-boot-starter/pom.xml +++ b/liteflow-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ liteflow com.yomahub - 2.5.7 + 2.5.8 diff --git a/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/FlowExecutorSpringBootTest.java b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/FlowExecutorSpringBootTest.java index f26b34d3c..8935aa9e7 100644 --- a/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/FlowExecutorSpringBootTest.java +++ b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/FlowExecutorSpringBootTest.java @@ -1,7 +1,9 @@ package com.yomahub.liteflow.test.exception; import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.entity.data.DefaultSlot; import com.yomahub.liteflow.entity.data.LiteflowResponse; +import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.exception.ChainNotFoundException; import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.exception.FlowExecutorNotInitException; @@ -72,6 +74,14 @@ public class FlowExecutorSpringBootTest extends BaseTest { Assert.assertEquals("no conditionList in this chain[chain2]", response.getMessage()); ReflectionUtils.rethrowException(response.getCause()); } + + @Test + public void testGetSlotFromResponseWhenException() throws Exception{ + LiteflowResponse response = flowExecutor.execute2Resp("chain4", "test"); + Assert.assertFalse(response.isSuccess()); + Assert.assertNotNull(response.getCause()); + Assert.assertNotNull(response.getSlot()); + } // @Test(expected = WhenExecuteException.class) // public void testWhenExecuteTimeoutException() throws Exception { diff --git a/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/cmp1/DCmp.java b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/cmp1/DCmp.java new file mode 100644 index 000000000..cd59be68b --- /dev/null +++ b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/exception/cmp1/DCmp.java @@ -0,0 +1,27 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.exception.cmp1; + +import com.yomahub.liteflow.core.NodeComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +@Component("d") +public class DCmp extends NodeComponent { + + private static final Logger LOG = LoggerFactory.getLogger(DCmp.class); + + @Override + public void process() { + if(1==1){ + int a = 1/0; + } + LOG.info("Dcomp executed!"); + } +} diff --git a/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/flowmeta/FlowMetaSpringbootTest.java b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/flowmeta/FlowMetaSpringbootTest.java index f3d7b00f1..b6b98ffeb 100644 --- a/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/flowmeta/FlowMetaSpringbootTest.java +++ b/liteflow-spring-boot-starter/src/test/java/com/yomahub/liteflow/test/flowmeta/FlowMetaSpringbootTest.java @@ -27,7 +27,7 @@ public class FlowMetaSpringbootTest extends BaseTest { @Resource private FlowExecutor flowExecutor; - //测试自定义AOP,串行场景 + //测试动态添加元信息节点 @Test public void testFlowMeta() { FlowBus.addNode("d", DCmp.class); diff --git a/liteflow-spring-boot-starter/src/test/resources/exception/flow.xml b/liteflow-spring-boot-starter/src/test/resources/exception/flow.xml index 82d25a129..e975ef328 100644 --- a/liteflow-spring-boot-starter/src/test/resources/exception/flow.xml +++ b/liteflow-spring-boot-starter/src/test/resources/exception/flow.xml @@ -10,4 +10,8 @@ + + + + \ No newline at end of file diff --git a/liteflow-test-spring/pom.xml b/liteflow-test-spring/pom.xml index c1eccb842..f854c9e09 100644 --- a/liteflow-test-spring/pom.xml +++ b/liteflow-test-spring/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.7 + 2.5.8 diff --git a/liteflow-test-springboot/pom.xml b/liteflow-test-springboot/pom.xml index 22bef1270..ecf7605b5 100644 --- a/liteflow-test-springboot/pom.xml +++ b/liteflow-test-springboot/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.7 + 2.5.8 diff --git a/pom.xml b/pom.xml index 74f84fff0..d7c0b055c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.yomahub liteflow pom - 2.5.7 + 2.5.8 liteflow a lightweight and practical micro-process framework https://github.com/bryan31/liteflow