From f1020b09e846f0580dff731feaf1fc1ae94efa0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=A4=A7=E9=94=A4?= Date: Wed, 19 May 2021 15:45:42 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E6=80=A7=E8=83=BD=E4=BC=98=E5=8C=96=20Data?= =?UTF-8?q?Bus=20Lock=20Free?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/entity/data/DataBus.java | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 5e072fad4..422ceb86a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -7,15 +7,18 @@ */ package com.yomahub.liteflow.entity.data; -import java.util.concurrent.atomic.AtomicInteger; - import cn.hutool.core.util.ObjectUtil; -import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + /** * 数据BUS,主要用来管理Slot,用以分配和回收 * @author Bryan.Zhang @@ -26,7 +29,9 @@ public class DataBus { public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0); - private static Slot[] slots; + private static AtomicReferenceArray SLOTS; + + private static ConcurrentLinkedQueue QUEUE; static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); @@ -36,20 +41,22 @@ public class DataBus { liteflowConfig = new LiteflowConfig(); } int slotSize = liteflowConfig.getSlotSize(); - slots = new Slot[slotSize]; + + SLOTS = new AtomicReferenceArray<>(slotSize); + + QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } - public synchronized static int offerSlot(Class slotClazz){ - try{ - for(int i = 0; i < slots.length; i++){ - if(ObjectUtil.isNull(slots[i])){ - slots[i] = slotClazz.newInstance(); - OCCUPY_COUNT.incrementAndGet(); - return i; - } + public synchronized static int offerSlot(Class slotClazz) { + try { + Slot slot = slotClazz.newInstance(); + Integer slotIndex = QUEUE.poll(); + if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) { + OCCUPY_COUNT.incrementAndGet(); + return slotIndex; } - }catch(Exception e){ - LOG.error("offer slot error",e); + } catch (Exception e) { + LOG.error("offer slot error", e); return -1; } return -1; @@ -57,13 +64,13 @@ public class DataBus { @SuppressWarnings("unchecked") public static T getSlot(int slotIndex){ - return (T)slots[slotIndex]; + return (T)SLOTS.get(slotIndex); } public static void releaseSlot(int slotIndex){ - if(ObjectUtil.isNotNull(slots[slotIndex])){ - LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex); - slots[slotIndex] = null; + if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){ + LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex); + SLOTS.set(slotIndex, null); OCCUPY_COUNT.decrementAndGet(); }else{ LOG.warn("slot[{}] already has been released",slotIndex); From 6f5adc0675b30fae233c3cb526f1e5fd9901e86a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=A4=A7=E9=94=A4?= Date: Wed, 19 May 2021 16:10:44 +0800 Subject: [PATCH 2/7] =?UTF-8?q?DataBus=20=E6=80=A7=E8=83=BD=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20Lock=20Free?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/yomahub/liteflow/entity/data/DataBus.java | 1 + 1 file changed, 1 insertion(+) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 422ceb86a..ccc87bfe1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -71,6 +71,7 @@ public class DataBus { if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){ LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex); SLOTS.set(slotIndex, null); + QUEUE.add(slotIndex); OCCUPY_COUNT.decrementAndGet(); }else{ LOG.warn("slot[{}] already has been released",slotIndex); From aebc5dfeea9d27a58019f2ac56d4c1c72be9a3ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=A4=A7=E9=94=A4?= Date: Wed, 19 May 2021 17:45:52 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E8=A1=A5=E5=85=85=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/yomahub/liteflow/entity/data/DataBus.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index ccc87bfe1..8051fc0bd 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -47,7 +47,7 @@ public class DataBus { QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new)); } - public synchronized static int offerSlot(Class slotClazz) { + public static int offerSlot(Class slotClazz) { try { Slot slot = slotClazz.newInstance(); Integer slotIndex = QUEUE.poll(); From 14ce4a6a5116578c8fda27108cee807bd0fa3a79 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Thu, 20 May 2021 00:53:10 +0800 Subject: [PATCH 4/7] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow/parser/ZookeeperXmlFlowParser.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java index a5ac76a7d..5496c10c6 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java @@ -4,6 +4,7 @@ import java.text.MessageFormat; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.flow.FlowBus; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; @@ -57,13 +58,11 @@ public class ZookeeperXmlFlowParser extends XmlFlowParser{ final NodeCache cache = new NodeCache(client,nodePath); cache.start(); - cache.getListenable().addListener(new NodeCacheListener() { - @Override - public void nodeChanged() throws Exception { - String content = new String(cache.getCurrentData().getData()); - LOG.info("stating load flow config...."); - parse(content); - } + cache.getListenable().addListener(() -> { + String content1 = new String(cache.getCurrentData().getData()); + LOG.info("stating load flow config...."); + FlowBus.cleanCache(); + parse(content1); }); } } From b911d464026a80da88127fac2301f229809e8b54 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Fri, 21 May 2021 14:08:14 +0800 Subject: [PATCH 5/7] =?UTF-8?q?enhancement=20#I3S5G8=20=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E5=B9=B3=E6=BB=91=E5=88=B7=E6=96=B0=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/yomahub/liteflow/flow/FlowBus.java | 135 ++++++++++-------- .../yomahub/liteflow/parser/FlowParser.java | 2 +- .../liteflow/parser/XmlFlowParser.java | 6 +- 3 files changed, 80 insertions(+), 63 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java index 0153b019d..f29114139 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java @@ -1,6 +1,7 @@ /** *

Title: liteflow

*

Description: 轻量级的组件式流程框架

+ * * @author Bryan.Zhang * @email weenyc31@163.com * @Date 2020/4/1 @@ -13,7 +14,11 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.entity.flow.Node; +import com.yomahub.liteflow.enums.FlowParserTypeEnum; import com.yomahub.liteflow.exception.ComponentCannotRegisterException; +import com.yomahub.liteflow.parser.LocalJsonFlowParser; +import com.yomahub.liteflow.parser.LocalXmlFlowParser; +import com.yomahub.liteflow.parser.LocalYmlFlowParser; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,75 +32,85 @@ import java.util.Map; */ public class FlowBus { - private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class); + private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class); - private static final Map chainMap = new HashMap<>(); + private static final Map chainMap = new HashMap<>(); - private static final Map nodeMap = new HashMap<>(); - - private FlowBus() { - } - - public static Chain getChain(String id) throws Exception { - if (MapUtil.isEmpty(chainMap)) { - throw new Exception("please config the rule first"); - } - return chainMap.get(id); - } + private static final Map nodeMap = new HashMap<>(); - public static void addChain(String name,Chain chain){ - chainMap.put(name, chain); - } + private FlowBus() { + } - public static boolean containChain(String chainId){ - return chainMap.containsKey(chainId); - } + public static Chain getChain(String id) throws Exception { + if (MapUtil.isEmpty(chainMap)) { + throw new Exception("please config the rule first"); + } + return chainMap.get(id); + } - public static boolean needInit() { - return MapUtil.isEmpty(chainMap); - } + public static void addChain(String name, Chain chain) { + chainMap.put(name, chain); + } - public static boolean containNode(String nodeId) { - return nodeMap.containsKey(nodeId); - } + public static boolean containChain(String chainId) { + return chainMap.containsKey(chainId); + } - public static void addNode(String nodeId, Node node) { - if (containNode(nodeId)) return; - nodeMap.put(nodeId, node); - } + public static boolean needInit() { + return MapUtil.isEmpty(chainMap); + } - public static void addNode(String nodeId, String cmpClazzStr) throws Exception{ - if (containNode(nodeId)) return; - Class cmpClazz = (Class)Class.forName(cmpClazzStr); - addNode(nodeId, cmpClazz); - } + public static boolean containNode(String nodeId) { + return nodeMap.containsKey(nodeId); + } - public static void addNode(String nodeId, Class cmpClazz){ - if (containNode(nodeId)) return; - try{ - //以node方式配置,本质上是为了适配无spring的环境,如果有spring环境,其实不用这么配置 - //这里的逻辑是判断是否能从spring上下文中取到,如果没有spring,则就是new instance了 - NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz); - if (ObjectUtil.isNull(cmpInstance)) { - LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName()); - cmpInstance = cmpClazz.newInstance(); - } - cmpInstance.setNodeId(nodeId); - cmpInstance.setSelf(cmpInstance); - nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance)); - }catch (Exception e){ - String error = StrUtil.format("component[{}] register error", cmpClazz.getName()); - LOG.error(error, e); - throw new ComponentCannotRegisterException(error); - } - } + public static void addNode(String nodeId, Node node) { + if (containNode(nodeId)) return; + nodeMap.put(nodeId, node); + } - public static Node getNode(String nodeId) { - return nodeMap.get(nodeId); - } + public static void addNode(String nodeId, String cmpClazzStr) throws Exception { + if (containNode(nodeId)) return; + Class cmpClazz = (Class) Class.forName(cmpClazzStr); + addNode(nodeId, cmpClazz); + } - public static void cleanCache(){ - chainMap.clear(); - nodeMap.clear(); - } + public static void addNode(String nodeId, Class cmpClazz) { + if (containNode(nodeId)) return; + try { + //以node方式配置,本质上是为了适配无spring的环境,如果有spring环境,其实不用这么配置 + //这里的逻辑是判断是否能从spring上下文中取到,如果没有spring,则就是new instance了 + NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz); + if (ObjectUtil.isNull(cmpInstance)) { + LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName()); + cmpInstance = cmpClazz.newInstance(); + } + cmpInstance.setNodeId(nodeId); + cmpInstance.setSelf(cmpInstance); + nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance)); + } catch (Exception e) { + String error = StrUtil.format("component[{}] register error", cmpClazz.getName()); + LOG.error(error, e); + throw new ComponentCannotRegisterException(error); + } + } + + public static Node getNode(String nodeId) { + return nodeMap.get(nodeId); + } + + public static void cleanCache() { + chainMap.clear(); + nodeMap.clear(); + } + + public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception { + if (type.equals(FlowParserTypeEnum.TYPE_XML)) { + new LocalXmlFlowParser().parse(content); + } else if (type.equals(FlowParserTypeEnum.TYPE_JSON)) { + new LocalJsonFlowParser().parse(content); + } else if (type.equals(FlowParserTypeEnum.TYPE_YML)) { + new LocalYmlFlowParser().parse(content); + } + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java index 7d89d3443..537a0cfa9 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/FlowParser.java @@ -29,7 +29,7 @@ public abstract class FlowParser { public abstract void parse(String content) throws Exception ; //条件节点的正则解析 - public static RegexEntity parseNodeStr(String str) { + public RegexEntity parseNodeStr(String str) { List list = new ArrayList(); Pattern p = Pattern.compile("[^\\)\\(]+"); Matcher m = p.matcher(str); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index fdab15e00..5af609625 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -9,6 +9,7 @@ import com.yomahub.liteflow.entity.flow.Condition; import com.yomahub.liteflow.entity.flow.Executable; import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; +import com.yomahub.liteflow.exception.ParseException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.spring.ComponentScanner; import com.yomahub.liteflow.util.SpringAware; @@ -67,8 +68,9 @@ public abstract class XmlFlowParser extends FlowParser{ parseOneChain(e); } } catch (Exception e) { - LOG.error("FlowParser parser exception", e); - throw e; + String errorMsg = "FlowParser parser exception"; + LOG.error(errorMsg, e); + throw new ParseException(errorMsg); } } From f73f42aac573fe4ea6a775bdf2ef3f17cc8e554c Mon Sep 17 00:00:00 2001 From: bryan31 Date: Fri, 21 May 2021 16:26:13 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/com/yomahub/liteflow/entity/flow/Node.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java index 705196eb5..5ebf1f45e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java @@ -36,7 +36,7 @@ public class Node implements Executable{ private NodeComponent instance; - private Map condNodeMap = new HashMap(); + private final Map condNodeMap = new HashMap<>(); public Node(){ From f462ca1e031dea337639108ef61bf723abe44b04 Mon Sep 17 00:00:00 2001 From: bryan31 Date: Fri, 21 May 2021 19:34:09 +0800 Subject: [PATCH 7/7] bug #I3SFOO https://gitee.com/dromara/liteFlow/issues/I3SFOO --- liteflow-core/pom.xml | 2 +- .../main/java/com/yomahub/liteflow/parser/XmlFlowParser.java | 4 +++- liteflow-spring-boot-starter/pom.xml | 2 +- liteflow-test-spring/pom.xml | 2 +- liteflow-test-springboot/pom.xml | 2 +- pom.xml | 2 +- 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index 745417a84..04ab79e44 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -9,7 +9,7 @@ com.yomahub liteflow - 2.5.5 + 2.5.6 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index 5af609625..e7c4d1f08 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -58,7 +58,9 @@ public abstract class XmlFlowParser extends FlowParser{ } } else { for (Entry componentEntry : ComponentScanner.nodeComponentMap.entrySet()) { - FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); + if (!FlowBus.containNode(componentEntry.getKey())){ + FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); + } } } diff --git a/liteflow-spring-boot-starter/pom.xml b/liteflow-spring-boot-starter/pom.xml index ae8f68100..96f36a864 100644 --- a/liteflow-spring-boot-starter/pom.xml +++ b/liteflow-spring-boot-starter/pom.xml @@ -10,7 +10,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/liteflow-test-spring/pom.xml b/liteflow-test-spring/pom.xml index e24e07f74..1b5d89200 100644 --- a/liteflow-test-spring/pom.xml +++ b/liteflow-test-spring/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/liteflow-test-springboot/pom.xml b/liteflow-test-springboot/pom.xml index 9899d523a..734c6e906 100644 --- a/liteflow-test-springboot/pom.xml +++ b/liteflow-test-springboot/pom.xml @@ -9,7 +9,7 @@ liteflow com.yomahub - 2.5.5 + 2.5.6 diff --git a/pom.xml b/pom.xml index 21289929f..233d7ffb1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.yomahub liteflow pom - 2.5.5 + 2.5.6 liteflow a lightweight and practical micro-process framework https://github.com/bryan31/liteflow