feature enhancement #I49FDK 中断重试目前是全局的,希望增加针对个别组件和特定exception

This commit is contained in:
bryan31
2021-09-22 20:09:48 +08:00
parent 21d4a4180e
commit 23f5cfb866
11 changed files with 187 additions and 75 deletions

View File

@@ -0,0 +1,26 @@
package com.yomahub.liteflow.annotation;
import org.springframework.core.annotation.AliasFor;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
* LiteFlow组件重试次数
*
* @author Bryan.Zhang
* @since 2.6.0
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RetryCount {
@AliasFor(value = "retry")
int value() default 0;
@AliasFor(value = "value")
int retry() default 0;
Class<? extends Exception>[] forExceptions() default {Exception.class};
}

View File

@@ -0,0 +1,59 @@
package com.yomahub.liteflow.core;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.RetryCount;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
/**
* 组件初始化器
* @author Bryan.Zhang
* @since 2.6.0
*/
public class ComponentInitializer {
private static ComponentInitializer instance;
public static ComponentInitializer loadInstance(){
if (ObjectUtil.isNull(instance)){
instance = new ComponentInitializer();
}
return instance;
}
public NodeComponent initComponent(NodeComponent nodeComponent, NodeTypeEnum type, String desc, String nodeId){
nodeComponent.setNodeId(nodeId);
nodeComponent.setSelf(nodeComponent);
nodeComponent.setType(type);
//先取传进来的name值(配置文件中配置的)再看有没有配置LiteflowComponent标注
nodeComponent.setName(desc);
if (nodeComponent.getType().equals(NodeTypeEnum.COMMON) && StrUtil.isBlank(nodeComponent.getName())){
//判断NodeComponent是否是标识了@LiteflowComponent的标注
//如果标注了那么要从中取到name字段
LiteflowComponent liteflowComponent = nodeComponent.getClass().getAnnotation(LiteflowComponent.class);
if (ObjectUtil.isNotNull(liteflowComponent)) {
String name = liteflowComponent.name();
if (StrUtil.isNotBlank(name)) {
nodeComponent.setName(name);
}
}
}
//先从组件上取@RetryCount标注如果没有则看全局配置全局配置如果不配置的话默认是0
//默认retryForExceptions为Exception.class
RetryCount retryCountAnnotation = nodeComponent.getClass().getAnnotation(RetryCount.class);
if (ObjectUtil.isNotNull(retryCountAnnotation)) {
nodeComponent.setRetryCount(retryCountAnnotation.retry());
nodeComponent.setRetryForExceptions(retryCountAnnotation.forExceptions());
} else {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
nodeComponent.setRetryCount(liteflowConfig.getRetryCount());
}
return nodeComponent;
}
}

View File

@@ -50,6 +50,12 @@ public abstract class NodeComponent {
//为何要设置这个用this不行么因为如果有aop去切的话this在spring的aop里是切不到的。self对象有可能是代理过的对象
private NodeComponent self;
//重试次数
private int retryCount = 0;
//在目标异常抛出时才重试
private Class<? extends Exception>[] retryForExceptions = new Class[]{Exception.class};
//是否结束整个流程,这个只对串行流程有效,并行流程无效
private final TransmittableThreadLocal<Boolean> isEndTL = new TransmittableThreadLocal<>();
@@ -191,4 +197,20 @@ public abstract class NodeComponent {
public <T> T getPrivateDeliveryData(){
return this.getSlot().getPrivateDeliveryData(this.getNodeId());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public Class<? extends Exception>[] getRetryForExceptions() {
return retryForExceptions;
}
public void setRetryForExceptions(Class<? extends Exception>[] retryForExceptions) {
this.retryForExceptions = retryForExceptions;
}
}

View File

@@ -10,6 +10,8 @@ package com.yomahub.liteflow.entity.data;
import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.lwawt.macosx.CSystemTray;
import java.util.Deque;
import java.util.Iterator;
import java.util.Queue;
@@ -99,7 +101,7 @@ public abstract class AbsSlot implements Slot {
public <T> void setPrivateDeliveryData(String nodeId, T t){
String privateDKey = PRIVATE_DELIVERY_PREFIX + nodeId;
synchronized (nodeId){
synchronized (this){
if (dataMap.containsKey(privateDKey)){
Queue<T> queue = (Queue<T>) dataMap.get(privateDKey);
queue.add(t);

View File

@@ -26,11 +26,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* chain对象实现可执行器
*
* @author Bryan.Zhang
*/
public class Chain implements Executable {
@@ -69,8 +73,6 @@ public class Chain implements Executable {
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
}
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
Slot slot = DataBus.getSlot(slotIndex);
//循环chain里包含的condition每一个condition有可能是then也有可能是when
@@ -78,20 +80,29 @@ public class Chain implements Executable {
for (Condition condition : conditionList) {
if (condition instanceof ThenCondition) {
for (Executable executableItem : condition.getNodeList()) {
//进行重试循环判断如果重试次数为0则只进行一次循环
for (int i = 0; i <= liteflowConfig.getRetryCount(); i++) {
try {
if (i > 0) {
LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), executableItem.getExecuteName(), i + 1);
}
executableItem.execute(slotIndex);
break;
} catch (ChainEndException e){
//如果是ChainEndException则无需重试
throw e;
} catch (Exception e) {
if (i >= liteflowConfig.getRetryCount()){
if (executableItem.getExecuteType().equals(ExecuteTypeEnum.CHAIN)) {
executableItem.execute(slotIndex);
} else {
int retryCount = ((Node)executableItem).getInstance().getRetryCount();
List<Class<? extends Exception>> forExceptions = Arrays.asList(((Node)executableItem).getInstance().getRetryForExceptions());
for (int i = 0; i <= retryCount; i++) {
try {
if (i > 0) {
LOG.info("[{}]:component[{}] performs {} retry", slot.getRequestId(), executableItem.getExecuteName(), i + 1);
}
executableItem.execute(slotIndex);
break;
} catch (ChainEndException e) {
//如果是ChainEndException则无需重试
throw e;
} catch (Exception e) {
//判断抛出的异常是不是指定异常的子类
boolean flag = forExceptions.stream().anyMatch(clazz -> clazz.isAssignableFrom(e.getClass()));
//两种情况不重试1)抛出异常不在指定异常范围内 2)已经重试次数大于等于配置次数
if (!flag || i >= retryCount){
throw e;
}
}
}
}

View File

@@ -11,6 +11,7 @@ package com.yomahub.liteflow.flow;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.ComponentInitializer;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.core.ScriptComponent;
import com.yomahub.liteflow.core.ScriptCondComponent;
@@ -71,9 +72,9 @@ public class FlowBus {
return nodeMap.containsKey(nodeId);
}
public static void addCommonNode(String nodeId, Node node) {
public static void addSpringScanNode(String nodeId, NodeComponent nodeComponent) {
if (containNode(nodeId)) return;
nodeMap.put(nodeId, node);
nodeMap.put(nodeId, new Node(ComponentInitializer.loadInstance().initComponent(nodeComponent, NodeTypeEnum.COMMON, null, nodeId)));
}
public static void addCommonNode(String nodeId, String name, String cmpClazzStr) throws Exception {
@@ -82,8 +83,8 @@ public class FlowBus {
addNode(nodeId, name, NodeTypeEnum.COMMON, cmpClazz, null);
}
public static void addCommonNode(String nodeId, Class<? extends NodeComponent> cmpClazz){
addNode(nodeId, null, NodeTypeEnum.COMMON, cmpClazz, null);
public static void addCommonNode(String nodeId, String name, Class<? extends NodeComponent> cmpClazz){
addNode(nodeId, name, NodeTypeEnum.COMMON, cmpClazz, null);
}
public static void addCommonScriptNode(String nodeId, String name, String script){
@@ -106,10 +107,9 @@ public class FlowBus {
LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName());
cmpInstance = cmpClazz.newInstance();
}
cmpInstance.setNodeId(nodeId);
cmpInstance.setName(name);
cmpInstance.setSelf(cmpInstance);
cmpInstance.setType(type);
//进行初始化
cmpInstance = ComponentInitializer.loadInstance().initComponent(cmpInstance, type, name, nodeId);
//如果是脚本节点(普通脚本节点/条件脚本节点)则还要加载script脚本
if (StrUtil.isNotBlank(script)){

View File

@@ -52,7 +52,7 @@ public abstract class JsonFlowParser extends FlowParser {
try {
for (Map.Entry<String, NodeComponent> componentEntry : ComponentScanner.nodeComponentMap.entrySet()) {
if (!FlowBus.containNode(componentEntry.getKey())) {
FlowBus.addCommonNode(componentEntry.getKey(), new Node(componentEntry.getValue()));
FlowBus.addSpringScanNode(componentEntry.getKey(), componentEntry.getValue());
}
}

View File

@@ -58,7 +58,7 @@ public abstract class XmlFlowParser extends FlowParser {
//先进行Spring上下文中的节点的判断
for (Entry<String, NodeComponent> componentEntry : ComponentScanner.nodeComponentMap.entrySet()) {
if (!FlowBus.containNode(componentEntry.getKey())) {
FlowBus.addCommonNode(componentEntry.getKey(), new Node(componentEntry.getValue()));
FlowBus.addSpringScanNode(componentEntry.getKey(), componentEntry.getValue());
}
}

View File

@@ -1,6 +1,7 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
@@ -10,16 +11,20 @@ package com.yomahub.liteflow.spring;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.annotation.RetryCount;
import com.yomahub.liteflow.aop.ICmpAroundAspect;
import com.yomahub.liteflow.core.ComponentInitializer;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.util.LOGOPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
/**
* 组件扫描类只要是NodeComponent的实现类都可以被这个扫描器扫到
@@ -27,56 +32,43 @@ import java.util.Map;
*/
public class ComponentScanner implements BeanPostProcessor {
private static final Logger LOG = LoggerFactory.getLogger(ComponentScanner.class);
private static final Logger LOG = LoggerFactory.getLogger(ComponentScanner.class);
public static Map<String, NodeComponent> nodeComponentMap = new HashMap<>();
public static Map<String, NodeComponent> nodeComponentMap = new HashMap<>();
public static ICmpAroundAspect cmpAroundAspect;
public static ICmpAroundAspect cmpAroundAspect;
static {
// 打印liteflow的LOGO
LOGOPrinter.print();
}
static {
// 打印liteflow的LOGO
LOGOPrinter.print();
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@SuppressWarnings("rawtypes")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class clazz = bean.getClass();
// 组件的扫描发现扫到之后缓存到类属性map中
if (NodeComponent.class.isAssignableFrom(clazz)) {
LOG.info("component[{}] has been found", beanName);
NodeComponent nodeComponent = (NodeComponent) bean;
nodeComponent.setNodeId(beanName);
@SuppressWarnings("rawtypes")
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class clazz = bean.getClass();
// 组件的扫描发现扫到之后缓存到类属性map中
if (NodeComponent.class.isAssignableFrom(clazz)) {
LOG.info("component[{}] has been found", beanName);
NodeComponent nodeComponent = (NodeComponent) bean;
nodeComponentMap.put(beanName, nodeComponent);
}
//判断NodeComponent是否是标识了@LiteflowComponent的标注
//如果标注了那么要从中取到name字段
LiteflowComponent liteflowComponent = bean.getClass().getAnnotation(LiteflowComponent.class);
if (ObjectUtil.isNotNull(liteflowComponent)){
String name = liteflowComponent.name();
if (StrUtil.isNotBlank(name)){
nodeComponent.setName(name);
}
}
// 组件Aop的实现类加载
if (ICmpAroundAspect.class.isAssignableFrom(clazz)) {
LOG.info("component aspect implement[{}] has been found", beanName);
cmpAroundAspect = (ICmpAroundAspect) bean;
}
nodeComponent.setSelf(nodeComponent);
nodeComponentMap.put(beanName, nodeComponent);
}
return bean;
}
// 组件Aop的实现类加载
if (ICmpAroundAspect.class.isAssignableFrom(clazz)) {
LOG.info("component aspect implement[{}] has been found", beanName);
cmpAroundAspect = (ICmpAroundAspect) bean;
}
return bean;
}
public static void cleanCache(){
nodeComponentMap.clear();
}
public static void cleanCache() {
nodeComponentMap.clear();
}
}

View File

@@ -24,7 +24,7 @@ public class GroovyScriptExecutor implements ScriptExecutor {
private ScriptEngine scriptEngine;
private Map<String, CompiledScript> compiledScriptMap = new HashMap<>();
private final Map<String, CompiledScript> compiledScriptMap = new HashMap<>();
@Override
public ScriptExecutor init() {

View File

@@ -30,9 +30,9 @@ public class FlowMetaSpringbootTest extends BaseTest {
//测试动态添加元信息节点
@Test
public void testFlowMeta() {
FlowBus.addCommonNode("d", DCmp.class);
FlowBus.addCommonNode("d", "d组件", DCmp.class);
LiteflowResponse<DefaultSlot> response= flowExecutor.execute2Resp("chain1", "it's a request");
Assert.assertTrue(response.isSuccess());
Assert.assertEquals("a==>b==>c==>d", response.getSlot().printStep());
Assert.assertEquals("a==>b==>c==>d[d组件]", response.getSlot().printStep());
}
}