diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index 745417a84..04ab79e44 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -9,7 +9,7 @@ com.yomahub liteflow - 2.5.5 + 2.5.6 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 5e072fad4..8051fc0bd 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -7,15 +7,18 @@ */ package com.yomahub.liteflow.entity.data; -import java.util.concurrent.atomic.AtomicInteger; - import cn.hutool.core.util.ObjectUtil; -import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * 数据BUS,主要用来管理Slot,用以分配和回收 * @author Bryan.Zhang @@ -26,7 +29,9 @@ public class DataBus { public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0); - private static Slot[] slots; + private static AtomicReferenceArray SLOTS; + + private static ConcurrentLinkedQueue QUEUE; static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); @@ -36,20 +41,22 @@ public class DataBus { liteflowConfig = new LiteflowConfig(); } int slotSize = liteflowConfig.getSlotSize(); - slots = new Slot[slotSize]; + + SLOTS = new AtomicReferenceArray<>(slotSize); + + QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } - public synchronized static int offerSlot(Class slotClazz){ - try{ - for(int i = 0; i < slots.length; i++){ - if(ObjectUtil.isNull(slots[i])){ - slots[i] = slotClazz.newInstance(); - OCCUPY_COUNT.incrementAndGet(); - return i; - } + public static int offerSlot(Class slotClazz) { + try { + Slot slot = slotClazz.newInstance(); + Integer slotIndex = QUEUE.poll(); + if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) { + OCCUPY_COUNT.incrementAndGet(); + return slotIndex; } - }catch(Exception e){ - LOG.error("offer slot error",e); + } catch (Exception e) { + LOG.error("offer slot error", e); return -1; } return -1; @@ -57,13 +64,14 @@ public class DataBus { @SuppressWarnings("unchecked") public static T getSlot(int slotIndex){ - return (T)slots[slotIndex]; + return (T)SLOTS.get(slotIndex); } public static void releaseSlot(int slotIndex){ - if(ObjectUtil.isNotNull(slots[slotIndex])){ - LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex); - slots[slotIndex] = null; + if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){ + LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex); + SLOTS.set(slotIndex, null); + QUEUE.add(slotIndex); OCCUPY_COUNT.decrementAndGet(); }else{ LOG.warn("slot[{}] already has been released",slotIndex); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java index 705196eb5..5ebf1f45e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java @@ -36,7 +36,7 @@ public class Node implements Executable{ private NodeComponent instance; - private Map condNodeMap = new HashMap(); + private final Map condNodeMap = new HashMap<>(); public Node(){ diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java index 0153b019d..f29114139 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java @@ -1,6 +1,7 @@ /** *

Title: liteflow

*

Description: 轻量级的组件式流程框架

+ * * @author Bryan.Zhang * @email weenyc31@163.com * @Date 2020/4/1 @@ -13,7 +14,11 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.entity.flow.Node; +import com.yomahub.liteflow.enums.FlowParserTypeEnum; import com.yomahub.liteflow.exception.ComponentCannotRegisterException; +import com.yomahub.liteflow.parser.LocalJsonFlowParser; +import com.yomahub.liteflow.parser.LocalXmlFlowParser; +import com.yomahub.liteflow.parser.LocalYmlFlowParser; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,75 +32,85 @@ import java.util.Map; */ public class FlowBus { - private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class); + private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class); - private static final Map chainMap = new HashMap<>(); + private static final Map chainMap = new HashMap<>(); - private static final Map nodeMap = new HashMap<>(); - - private FlowBus() { - } - - public static Chain getChain(String id) throws Exception { - if (MapUtil.isEmpty(chainMap)) { - throw new Exception("please config the rule first"); - } - return chainMap.get(id); - } + private static final Map nodeMap = new HashMap<>(); - public static void addChain(String name,Chain chain){ - chainMap.put(name, chain); - } + private FlowBus() { + } - public static boolean containChain(String chainId){ - return chainMap.containsKey(chainId); - } + public static Chain getChain(String id) throws Exception { + if (MapUtil.isEmpty(chainMap)) { + throw new Exception("please config the rule first"); + } + return chainMap.get(id); + } - public static boolean needInit() { - return MapUtil.isEmpty(chainMap); - } + public static void addChain(String name, Chain chain) { + chainMap.put(name, chain); + } - public static boolean containNode(String nodeId) { - return nodeMap.containsKey(nodeId); - } + public static boolean containChain(String chainId) { + return chainMap.containsKey(chainId); + } - public static void addNode(String nodeId, Node node) { - if (containNode(nodeId)) return; - nodeMap.put(nodeId, node); - } + public static boolean needInit() { + return MapUtil.isEmpty(chainMap); + } - public static void addNode(String nodeId, String cmpClazzStr) throws Exception{ - if (containNode(nodeId)) return; - Class cmpClazz = (Class)Class.forName(cmpClazzStr); - addNode(nodeId, cmpClazz); - } + public static boolean containNode(String nodeId) { + return nodeMap.containsKey(nodeId); + } - public static void addNode(String nodeId, Class cmpClazz){ - if (containNode(nodeId)) return; - try{ - //以node方式配置,本质上是为了适配无spring的环境,如果有spring环境,其实不用这么配置 - //这里的逻辑是判断是否能从spring上下文中取到,如果没有spring,则就是new instance了 - NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz); - if (ObjectUtil.isNull(cmpInstance)) { - LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName()); - cmpInstance = cmpClazz.newInstance(); - } - cmpInstance.setNodeId(nodeId); - cmpInstance.setSelf(cmpInstance); - nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance)); - }catch (Exception e){ - String error = StrUtil.format("component[{}] register error", cmpClazz.getName()); - LOG.error(error, e); - throw new ComponentCannotRegisterException(error); - } - } + public static void addNode(String nodeId, Node node) { + if (containNode(nodeId)) return; + nodeMap.put(nodeId, node); + } - public static Node getNode(String nodeId) { - return nodeMap.get(nodeId); - } + public static void addNode(String nodeId, String cmpClazzStr) throws Exception { + if (containNode(nodeId)) return; + Class cmpClazz = (Class) Class.forName(cmpClazzStr); + addNode(nodeId, cmpClazz); + } - public static void cleanCache(){ - chainMap.clear(); - nodeMap.clear(); - } + public static void addNode(String nodeId, Class cmpClazz) { + if (containNode(nodeId)) return; + try { + //以node方式配置,本质上是为了适配无spring的环境,如果有spring环境,其实不用这么配置 + //这里的逻辑是判断是否能从spring上下文中取到,如果没有spring,则就是new instance了 + NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz); + if (ObjectUtil.isNull(cmpInstance)) { + LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName()); + cmpInstance = cmpClazz.newInstance(); + } + cmpInstance.setNodeId(nodeId); + cmpInstance.setSelf(cmpInstance); + nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance)); + } catch (Exception e) { + String error = StrUtil.format("component[{}] register error", cmpClazz.getName()); + LOG.error(error, e); + throw new ComponentCannotRegisterException(error); + } + } + + public static Node getNode(String nodeId) { + return nodeMap.get(nodeId); + } + + public static void cleanCache() { + chainMap.clear(); + nodeMap.clear(); + } + + public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception { + if (type.equals(FlowParserTypeEnum.TYPE_XML)) { + new LocalXmlFlowParser().parse(content); + } else if (type.equals(FlowParserTypeEnum.TYPE_JSON)) { + new LocalJsonFlowParser().parse(content); + } else if (type.equals(FlowParserTypeEnum.TYPE_YML)) { + new LocalYmlFlowParser().parse(content); + } + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java index 7d89d3443..537a0cfa9 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java @@ -29,7 +29,7 @@ public abstract class FlowParser { public abstract void parse(String content) throws Exception ; //条件节点的正则解析 - public static RegexEntity parseNodeStr(String str) { + public RegexEntity parseNodeStr(String str) { List list = new ArrayList(); Pattern p = Pattern.compile("[^\\)\\(]+"); Matcher m = p.matcher(str); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index fdab15e00..e7c4d1f08 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -9,6 +9,7 @@ import com.yomahub.liteflow.entity.flow.Condition; import com.yomahub.liteflow.entity.flow.Executable; import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; +import com.yomahub.liteflow.exception.ParseException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.spring.ComponentScanner; import com.yomahub.liteflow.util.SpringAware; @@ -57,7 +58,9 @@ public abstract class XmlFlowParser extends FlowParser{ } } else { for (Entry componentEntry : ComponentScanner.nodeComponentMap.entrySet()) { - FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); + if (!FlowBus.containNode(componentEntry.getKey())){ + FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); + } } } @@ -67,8 +70,9 @@ public abstract class XmlFlowParser extends FlowParser{ parseOneChain(e); } } catch (Exception e) { - LOG.error("FlowParser parser exception", e); - throw e; + String errorMsg = "FlowParser parser exception"; + LOG.error(errorMsg, e); + throw new ParseException(errorMsg); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java index a5ac76a7d..5496c10c6 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java @@ -4,6 +4,7 @@ import java.text.MessageFormat; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.flow.FlowBus; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; @@ -57,13 +58,11 @@ public class ZookeeperXmlFlowParser extends XmlFlowParser{ final NodeCache cache = new NodeCache(client,nodePath); cache.start(); - cache.getListenable().addListener(new NodeCacheListener() { - @Override - public void nodeChanged() throws Exception { - String content = new String(cache.getCurrentData().getData()); - LOG.info("stating load flow config...."); - parse(content); - } + cache.getListenable().addListener(() -> { + String content1 = new String(cache.getCurrentData().getData()); + LOG.info("stating load flow config...."); + FlowBus.cleanCache(); + parse(content1); }); } } diff --git a/liteflow-spring-boot-starter/pom.xml b/liteflow-spring-boot-starter/pom.xml index ae8f68100..96f36a864 100644 --- a/liteflow-spring-boot-starter/pom.xml +++ b/liteflow-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/liteflow-test-spring/pom.xml b/liteflow-test-spring/pom.xml index e24e07f74..1b5d89200 100644 --- a/liteflow-test-spring/pom.xml +++ b/liteflow-test-spring/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/liteflow-test-springboot/pom.xml b/liteflow-test-springboot/pom.xml index 9899d523a..734c6e906 100644 --- a/liteflow-test-springboot/pom.xml +++ b/liteflow-test-springboot/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/pom.xml b/pom.xml index 21289929f..233d7ffb1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.yomahub liteflow pom - 2.5.5 + 2.5.6 liteflow a lightweight and practical micro-process framework https://github.com/bryan31/liteflow