Merge branch 'v2.5.6'

This commit is contained in:
bryan31
2021-05-21 19:35:53 +08:00
11 changed files with 122 additions and 96 deletions

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>com.yomahub</groupId>
<artifactId>liteflow</artifactId>
<version>2.5.5</version>
<version>2.5.6</version>
</parent>
<dependencies>

View File

@@ -7,15 +7,18 @@
*/
package com.yomahub.liteflow.entity.data;
import java.util.concurrent.atomic.AtomicInteger;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ConfigErrorException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 数据BUS主要用来管理Slot用以分配和回收
* @author Bryan.Zhang
@@ -26,7 +29,9 @@ public class DataBus {
public static AtomicInteger OCCUPY_COUNT = new AtomicInteger(0);
private static Slot[] slots;
private static AtomicReferenceArray<Slot> SLOTS;
private static ConcurrentLinkedQueue<Integer> QUEUE;
static {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
@@ -36,20 +41,22 @@ public class DataBus {
liteflowConfig = new LiteflowConfig();
}
int slotSize = liteflowConfig.getSlotSize();
slots = new Slot[slotSize];
SLOTS = new AtomicReferenceArray<>(slotSize);
QUEUE = IntStream.range(0, slotSize - 1).boxed().collect(Collectors.toCollection(ConcurrentLinkedQueue::new));
}
public synchronized static int offerSlot(Class<? extends Slot> slotClazz){
try{
for(int i = 0; i < slots.length; i++){
if(ObjectUtil.isNull(slots[i])){
slots[i] = slotClazz.newInstance();
OCCUPY_COUNT.incrementAndGet();
return i;
}
public static int offerSlot(Class<? extends Slot> slotClazz) {
try {
Slot slot = slotClazz.newInstance();
Integer slotIndex = QUEUE.poll();
if (ObjectUtil.isNotNull(slotIndex) && SLOTS.compareAndSet(slotIndex, null, slot)) {
OCCUPY_COUNT.incrementAndGet();
return slotIndex;
}
}catch(Exception e){
LOG.error("offer slot error",e);
} catch (Exception e) {
LOG.error("offer slot error", e);
return -1;
}
return -1;
@@ -57,13 +64,14 @@ public class DataBus {
@SuppressWarnings("unchecked")
public static <T extends Slot> T getSlot(int slotIndex){
return (T)slots[slotIndex];
return (T)SLOTS.get(slotIndex);
}
public static void releaseSlot(int slotIndex){
if(ObjectUtil.isNotNull(slots[slotIndex])){
LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex);
slots[slotIndex] = null;
if(ObjectUtil.isNotNull(SLOTS.get(slotIndex))){
LOG.info("[{}]:slot[{}] released",SLOTS.get(slotIndex).getRequestId(),slotIndex);
SLOTS.set(slotIndex, null);
QUEUE.add(slotIndex);
OCCUPY_COUNT.decrementAndGet();
}else{
LOG.warn("slot[{}] already has been released",slotIndex);

View File

@@ -36,7 +36,7 @@ public class Node implements Executable{
private NodeComponent instance;
private Map<String, Executable> condNodeMap = new HashMap<String, Executable>();
private final Map<String, Executable> condNodeMap = new HashMap<>();
public Node(){

View File

@@ -1,6 +1,7 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
@@ -13,7 +14,11 @@ import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.entity.flow.Chain;
import com.yomahub.liteflow.entity.flow.Node;
import com.yomahub.liteflow.enums.FlowParserTypeEnum;
import com.yomahub.liteflow.exception.ComponentCannotRegisterException;
import com.yomahub.liteflow.parser.LocalJsonFlowParser;
import com.yomahub.liteflow.parser.LocalXmlFlowParser;
import com.yomahub.liteflow.parser.LocalYmlFlowParser;
import com.yomahub.liteflow.util.SpringAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,75 +32,85 @@ import java.util.Map;
*/
public class FlowBus {
private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class);
private static final Logger LOG = LoggerFactory.getLogger(FlowBus.class);
private static final Map<String, Chain> chainMap = new HashMap<>();
private static final Map<String, Chain> chainMap = new HashMap<>();
private static final Map<String, Node> nodeMap = new HashMap<>();
private FlowBus() {
}
public static Chain getChain(String id) throws Exception {
if (MapUtil.isEmpty(chainMap)) {
throw new Exception("please config the rule first");
}
return chainMap.get(id);
}
private static final Map<String, Node> nodeMap = new HashMap<>();
public static void addChain(String name,Chain chain){
chainMap.put(name, chain);
}
private FlowBus() {
}
public static boolean containChain(String chainId){
return chainMap.containsKey(chainId);
}
public static Chain getChain(String id) throws Exception {
if (MapUtil.isEmpty(chainMap)) {
throw new Exception("please config the rule first");
}
return chainMap.get(id);
}
public static boolean needInit() {
return MapUtil.isEmpty(chainMap);
}
public static void addChain(String name, Chain chain) {
chainMap.put(name, chain);
}
public static boolean containNode(String nodeId) {
return nodeMap.containsKey(nodeId);
}
public static boolean containChain(String chainId) {
return chainMap.containsKey(chainId);
}
public static void addNode(String nodeId, Node node) {
if (containNode(nodeId)) return;
nodeMap.put(nodeId, node);
}
public static boolean needInit() {
return MapUtil.isEmpty(chainMap);
}
public static void addNode(String nodeId, String cmpClazzStr) throws Exception{
if (containNode(nodeId)) return;
Class<NodeComponent> cmpClazz = (Class<NodeComponent>)Class.forName(cmpClazzStr);
addNode(nodeId, cmpClazz);
}
public static boolean containNode(String nodeId) {
return nodeMap.containsKey(nodeId);
}
public static void addNode(String nodeId, Class<? extends NodeComponent> cmpClazz){
if (containNode(nodeId)) return;
try{
//以node方式配置本质上是为了适配无spring的环境如果有spring环境其实不用这么配置
//这里的逻辑是判断是否能从spring上下文中取到如果没有spring则就是new instance了
NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz);
if (ObjectUtil.isNull(cmpInstance)) {
LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName());
cmpInstance = cmpClazz.newInstance();
}
cmpInstance.setNodeId(nodeId);
cmpInstance.setSelf(cmpInstance);
nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance));
}catch (Exception e){
String error = StrUtil.format("component[{}] register error", cmpClazz.getName());
LOG.error(error, e);
throw new ComponentCannotRegisterException(error);
}
}
public static void addNode(String nodeId, Node node) {
if (containNode(nodeId)) return;
nodeMap.put(nodeId, node);
}
public static Node getNode(String nodeId) {
return nodeMap.get(nodeId);
}
public static void addNode(String nodeId, String cmpClazzStr) throws Exception {
if (containNode(nodeId)) return;
Class<NodeComponent> cmpClazz = (Class<NodeComponent>) Class.forName(cmpClazzStr);
addNode(nodeId, cmpClazz);
}
public static void cleanCache(){
chainMap.clear();
nodeMap.clear();
}
public static void addNode(String nodeId, Class<? extends NodeComponent> cmpClazz) {
if (containNode(nodeId)) return;
try {
//以node方式配置本质上是为了适配无spring的环境如果有spring环境其实不用这么配置
//这里的逻辑是判断是否能从spring上下文中取到如果没有spring则就是new instance了
NodeComponent cmpInstance = SpringAware.registerOrGet(cmpClazz);
if (ObjectUtil.isNull(cmpInstance)) {
LOG.warn("couldn't find component class [{}] from spring context", cmpClazz.getName());
cmpInstance = cmpClazz.newInstance();
}
cmpInstance.setNodeId(nodeId);
cmpInstance.setSelf(cmpInstance);
nodeMap.put(nodeId, new Node(nodeId, cmpClazz.getName(), cmpInstance));
} catch (Exception e) {
String error = StrUtil.format("component[{}] register error", cmpClazz.getName());
LOG.error(error, e);
throw new ComponentCannotRegisterException(error);
}
}
public static Node getNode(String nodeId) {
return nodeMap.get(nodeId);
}
public static void cleanCache() {
chainMap.clear();
nodeMap.clear();
}
public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
if (type.equals(FlowParserTypeEnum.TYPE_XML)) {
new LocalXmlFlowParser().parse(content);
} else if (type.equals(FlowParserTypeEnum.TYPE_JSON)) {
new LocalJsonFlowParser().parse(content);
} else if (type.equals(FlowParserTypeEnum.TYPE_YML)) {
new LocalYmlFlowParser().parse(content);
}
}
}

View File

@@ -29,7 +29,7 @@ public abstract class FlowParser {
public abstract void parse(String content) throws Exception ;
//条件节点的正则解析
public static RegexEntity parseNodeStr(String str) {
public RegexEntity parseNodeStr(String str) {
List<String> list = new ArrayList<String>();
Pattern p = Pattern.compile("[^\\)\\(]+");
Matcher m = p.matcher(str);

View File

@@ -9,6 +9,7 @@ import com.yomahub.liteflow.entity.flow.Condition;
import com.yomahub.liteflow.entity.flow.Executable;
import com.yomahub.liteflow.entity.flow.Node;
import com.yomahub.liteflow.exception.ExecutableItemNotFoundException;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.spring.ComponentScanner;
import com.yomahub.liteflow.util.SpringAware;
@@ -57,7 +58,9 @@ public abstract class XmlFlowParser extends FlowParser{
}
} else {
for (Entry<String, NodeComponent> componentEntry : ComponentScanner.nodeComponentMap.entrySet()) {
FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue()));
if (!FlowBus.containNode(componentEntry.getKey())){
FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue()));
}
}
}
@@ -67,8 +70,9 @@ public abstract class XmlFlowParser extends FlowParser{
parseOneChain(e);
}
} catch (Exception e) {
LOG.error("FlowParser parser exception", e);
throw e;
String errorMsg = "FlowParser parser exception";
LOG.error(errorMsg, e);
throw new ParseException(errorMsg);
}
}

View File

@@ -4,6 +4,7 @@ import java.text.MessageFormat;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.flow.FlowBus;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
@@ -57,13 +58,11 @@ public class ZookeeperXmlFlowParser extends XmlFlowParser{
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String content = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content);
}
cache.getListenable().addListener(() -> {
String content1 = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
FlowBus.cleanCache();
parse(content1);
});
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.5.5</version>
<version>2.5.6</version>
</parent>
<dependencies>

View File

@@ -9,7 +9,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.5.5</version>
<version>2.5.6</version>
</parent>
<dependencies>

View File

@@ -9,7 +9,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.5.5</version>
<version>2.5.6</version>
</parent>
<dependencyManagement>

View File

@@ -5,7 +5,7 @@
<groupId>com.yomahub</groupId>
<artifactId>liteflow</artifactId>
<packaging>pom</packaging>
<version>2.5.5</version>
<version>2.5.6</version>
<name>liteflow</name>
<description>a lightweight and practical micro-process framework</description>
<url>https://github.com/bryan31/liteflow</url>