增加了对zk的支持

This commit is contained in:
bryan.zhang
2018-02-26 20:48:32 +08:00
parent bf9cd9fb76
commit 38f6addfd6
15 changed files with 513 additions and 79 deletions

View File

@@ -21,6 +21,7 @@
<slf4j.version>1.7.13</slf4j.version>
<fastjson.version>1.2.7</fastjson.version>
<dom4j.version>1.6.1</dom4j.version>
<curator.version>2.11.1</curator.version>
<junit.version>4.12</junit.version>
</properties>
@@ -100,6 +101,11 @@
<artifactId>commons-logging</artifactId>
<version>${commons-logging.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
</dependencies>
<build>

View File

@@ -13,6 +13,8 @@ import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -27,12 +29,14 @@ import com.thebeastshop.liteflow.entity.data.DataBus;
import com.thebeastshop.liteflow.entity.data.DefaultSlot;
import com.thebeastshop.liteflow.entity.data.Slot;
import com.thebeastshop.liteflow.exception.ChainNotFoundException;
import com.thebeastshop.liteflow.exception.ComponentNotAccessException;
import com.thebeastshop.liteflow.exception.FlowExecutorNotInitException;
import com.thebeastshop.liteflow.exception.FlowSystemException;
import com.thebeastshop.liteflow.exception.NoAvailableSlotException;
import com.thebeastshop.liteflow.exception.ParseException;
import com.thebeastshop.liteflow.flow.FlowBus;
import com.thebeastshop.liteflow.parser.FlowParser;
import com.thebeastshop.liteflow.parser.LocalXmlFlowParser;
import com.thebeastshop.liteflow.parser.XmlFlowParser;
import com.thebeastshop.liteflow.parser.ZookeeperXmlFlowParser;
import com.thebeastshop.liteflow.util.LOGOPrinter;
public class FlowExecutor {
@@ -41,17 +45,36 @@ public class FlowExecutor {
private List<String> rulePath;
private String zkNode;
public void init() {
XmlFlowParser parser = null;
for(String path : rulePath){
try {
FlowParser.parseLocal(path);
if(isZKConfig(path)) {
if(StringUtils.isNotBlank(zkNode)) {
parser = new ZookeeperXmlFlowParser(zkNode);
}else {
parser = new ZookeeperXmlFlowParser();
}
}else {
parser = new LocalXmlFlowParser();
}
parser.parseMain(path);
} catch (Exception e) {
String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path);
LOG.error(errorMsg,e);
throw new FlowExecutorNotInitException(errorMsg);
}
}
}
private boolean isZKConfig(String path) {
Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*");
Matcher m = p.matcher(path);
return m.find();
}
public void reloadRule(){
init();
}
@@ -200,4 +223,12 @@ public class FlowExecutor {
public void setRulePath(List<String> rulePath) {
this.rulePath = rulePath;
}
public String getZkNode() {
return zkNode;
}
public void setZkNode(String zkNode) {
this.zkNode = zkNode;
}
}

View File

@@ -20,8 +20,9 @@ import com.thebeastshop.liteflow.entity.data.CmpStepType;
import com.thebeastshop.liteflow.entity.data.DataBus;
import com.thebeastshop.liteflow.entity.data.Slot;
import com.thebeastshop.liteflow.entity.monitor.CompStatistics;
import com.thebeastshop.liteflow.flow.FlowBus;
import com.thebeastshop.liteflow.monitor.MonitorBus;
import com.thebeastshop.liteflow.parser.FlowParser;
import com.thebeastshop.liteflow.parser.LocalXmlFlowParser;
public abstract class NodeComponent {
@@ -37,12 +38,11 @@ public abstract class NodeComponent {
slot.addStep(new CmpStep(nodeId, CmpStepType.START));
StopWatch stopWatch = new StopWatch();
stopWatch.start();
long initm=Runtime.getRuntime().freeMemory();
process();
stopWatch.stop();
long timeSpent = stopWatch.getTime();
long endm=Runtime.getRuntime().freeMemory();
slot.addStep(new CmpStep(nodeId, CmpStepType.END));
@@ -56,7 +56,7 @@ public abstract class NodeComponent {
if(this instanceof NodeCondComponent){
String condNodeId = slot.getCondResult(this.getClass().getName());
if(StringUtils.isNotBlank(condNodeId)){
Node thisNode = FlowParser.getNode(nodeId);
Node thisNode = FlowBus.getNode(nodeId);
Node condNode = thisNode.getCondNode(condNodeId);
if(condNode != null){
NodeComponent condComponent = condNode.getInstance();

View File

@@ -0,0 +1,21 @@
package com.thebeastshop.liteflow.exception;
public class ParseException extends RuntimeException {
private static final long serialVersionUID = 1L;
/** 异常信息 */
private String message;
public ParseException(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@@ -15,11 +15,14 @@ import java.util.Map;
import org.apache.commons.collections4.MapUtils;
import com.thebeastshop.liteflow.entity.config.Chain;
import com.thebeastshop.liteflow.entity.config.Node;
public class FlowBus {
private static Map<String, Chain> chainMap;
private static Map<String, Node> nodeMap;
public static Chain getChain(String id) throws Exception{
if(chainMap == null || chainMap.isEmpty()){
throw new Exception("please config the rule first");
@@ -37,4 +40,15 @@ public class FlowBus {
public static boolean needInit() {
return MapUtils.isEmpty(chainMap);
}
public static void addNode(String nodeId, Node node) {
if(nodeMap == null) {
nodeMap = new HashMap<String, Node>();
}
nodeMap.put(nodeId, node);
}
public static Node getNode(String nodeId) {
return nodeMap.get(nodeId);
}
}

View File

@@ -0,0 +1,22 @@
/**
* <p>Title: liteFlow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* <p>Copyright: Copyright (c) 2017</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2017-7-28
* @version 1.0
*/
package com.thebeastshop.liteflow.parser;
import com.thebeastshop.liteflow.util.IOUtil;
public class LocalXmlFlowParser extends XmlFlowParser{
private final String ENCODING_FORMAT = "UTF-8";
public void parseMain(String rulePath) throws Exception {
String ruleContent = IOUtil.read(rulePath, ENCODING_FORMAT);
parse(ruleContent);
}
}

View File

@@ -0,0 +1,32 @@
package com.thebeastshop.liteflow.parser;
import java.util.Arrays;
public class RegexEntity {
private String condNode;
private String[] realNodeArray;
public String getCondNode() {
return condNode;
}
public void setCondNode(String condNode) {
this.condNode = condNode;
}
public String[] getRealNodeArray() {
return realNodeArray;
}
public void setRealNodeArray(String[] realNodeArray) {
this.realNodeArray = realNodeArray;
}
@Override
public String toString() {
return "RegexEntity [condNode=" + condNode + ", realNodeArray="
+ Arrays.toString(realNodeArray) + "]";
}
}

View File

@@ -1,20 +1,8 @@
/**
* <p>Title: liteFlow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* <p>Copyright: Copyright (c) 2017</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2017-7-28
* @version 1.0
*/
package com.thebeastshop.liteflow.parser;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -34,28 +22,20 @@ import com.thebeastshop.liteflow.entity.config.WhenCondition;
import com.thebeastshop.liteflow.flow.FlowBus;
import com.thebeastshop.liteflow.spring.ComponentScaner;
import com.thebeastshop.liteflow.util.Dom4JReader;
import com.thebeastshop.liteflow.util.IOUtil;
@SuppressWarnings("unchecked")
public class FlowParser {
private static final Logger LOG = LoggerFactory.getLogger(FlowParser.class);
private static final String ENCODING_FORMAT = "UTF-8";
public abstract class XmlFlowParser {
private static Map<String, Node> nodeMap = new HashMap<String, Node>();
public static void parseLocal(String rulePath) throws Exception {
String ruleContent = IOUtil.read(rulePath, ENCODING_FORMAT);
parse(ruleContent);
}
public static void parse(String content) throws Exception {
private final Logger LOG = LoggerFactory.getLogger(XmlFlowParser.class);
public abstract void parseMain(String path) throws Exception;
public void parse(String content) throws Exception {
Document document = Dom4JReader.getFormatDocument(content);
parse(document);
}
public static void parse(Document document) throws Exception {
@SuppressWarnings("unchecked")
public void parse(Document document) throws Exception {
try {
Element rootElement = document.getRootElement();
@@ -79,11 +59,11 @@ public class FlowParser {
}
component.setNodeId(id);
node.setInstance(component);
nodeMap.put(id, node);
FlowBus.addNode(id, node);
}
}else{
for(Entry<String, NodeComponent> componentEntry : ComponentScaner.nodeComponentMap.entrySet()){
nodeMap.put(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue()));
FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue()));
}
}
@@ -110,11 +90,11 @@ public class FlowParser {
Node node = null;
for (int i = 0; i < condArray.length; i++) {
regexEntity = parseNodeStr(condArray[i].trim());
node = nodeMap.get(regexEntity.getCondNode());
node = FlowBus.getNode(regexEntity.getCondNode());
chainNodeList.add(node);
if(regexEntity.getRealNodeArray() != null){
for(String key : regexEntity.getRealNodeArray()){
Node condNode = nodeMap.get(key);
Node condNode = FlowBus.getNode(key);
if(condNode != null){
node.setCondNode(condNode.getId(), condNode);
}
@@ -135,39 +115,6 @@ public class FlowParser {
}
public static Node getNode(String nodeId){
return nodeMap.get(nodeId);
}
private static class RegexEntity{
private String condNode;
private String[] realNodeArray;
public String getCondNode() {
return condNode;
}
public void setCondNode(String condNode) {
this.condNode = condNode;
}
public String[] getRealNodeArray() {
return realNodeArray;
}
public void setRealNodeArray(String[] realNodeArray) {
this.realNodeArray = realNodeArray;
}
@Override
public String toString() {
return "RegexEntity [condNode=" + condNode + ", realNodeArray="
+ Arrays.toString(realNodeArray) + "]";
}
}
public static RegexEntity parseNodeStr(String str) {
List<String> list = new ArrayList<String>();
Pattern p = Pattern.compile("[^\\)\\(]+");
@@ -186,8 +133,4 @@ public class FlowParser {
}
return regexEntity;
}
public static void main(String[] args) {
System.out.println(parseNodeStr("aaaa ( xxxx | yyyy | vvvv )"));
}
}

View File

@@ -0,0 +1,63 @@
package com.thebeastshop.liteflow.parser;
import java.text.MessageFormat;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.thebeastshop.liteflow.exception.ParseException;
public class ZookeeperXmlFlowParser extends XmlFlowParser{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperXmlFlowParser.class);
private String nodePath = "/lite-flow/flow";
public ZookeeperXmlFlowParser() {
}
public ZookeeperXmlFlowParser(String node) {
nodePath = node;
}
@Override
public void parseMain(String path) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(
path,
new RetryNTimes(10, 5000)
);
client.start();
if (client.checkExists().forPath(nodePath) == null) {
client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes());
}
String content = new String(client.getData().forPath(nodePath));
if(StringUtils.isBlank(content)) {
String error = MessageFormat.format("the node[{0}] value is empty", nodePath);
throw new ParseException(error);
}
parse(content);
final NodeCache cache = new NodeCache(client,nodePath);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String content = new String(cache.getCurrentData().getData());
LOG.info("stating load flow config....");
parse(content);
}
});
}
}

View File

@@ -15,7 +15,7 @@ public class LOGOPrinter {
str.append(" | | | | | | | _| _____| |_ | | | | | \\ \\ /\\ / / \n");
str.append(" | |___ | | | | | |__|_____| _| | |__| |_| |\\ V V / \n");
str.append(" |_____|___| |_| |_____| |_| |_____\\___/ \\_/\\_/ \n\n");
str.append(" 做最轻量级,最吊炸天的微流程框架\n");
str.append(" 做最轻量级,最实用的微流程框架\n");
str.append(" To be the most lightweight and the most practical micro-process framework\n");
str.append("================================================================================================\n");
LOG.info(str.toString());

View File

@@ -19,7 +19,7 @@ import com.thebeastshop.liteflow.entity.data.Slot;
@ContextConfiguration(locations = { "classpath:spring-test.xml" })
public class TestWithSpringMain {
@Resource
@Resource(name="flowExecutor")
private FlowExecutor flowExecutor;
@Test
@@ -50,4 +50,17 @@ public class TestWithSpringMain {
}
}
@Test
public void test3() throws Exception {
try {
while(true) {
Slot slot = flowExecutor.execute("chain3", "it's a request");
Thread.sleep(2000);
}
}catch(Exception e) {
e.printStackTrace();
}
}
}

View File

@@ -0,0 +1,135 @@
package com.thebeastshop.liteflow.test.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;
public class CuratorTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "123.206.92.144:2181,123.206.92.144:2182,123.206.92.144:2183";
private static final String ZK_PATH = "/zktest/a1/aa1";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
checkNode(client);
// childNodeListen(client);
// removeNodeData(client);
// createNode(client);
// nodeListen(client);
//
// modifyNodeData(client);
System.in.read();
// getNodeData(client);
//
//
}
private static void checkNode(CuratorFramework client) throws Exception {
System.out.println(client.checkExists().forPath("/test"));
}
private static void createNode(CuratorFramework client) throws Exception {
String data1 = "nice to meet you";
print("create", ZK_PATH, data1);
client.create().
creatingParentsIfNeeded().
forPath(ZK_PATH, data1.getBytes());
}
private static void getNodeData(CuratorFramework client) throws Exception {
print("ls", "/");
print(client.getChildren().forPath("/"));
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
}
private static void modifyNodeData(CuratorFramework client) throws Exception {
String data2 = "world for u";
print("set", ZK_PATH, data2);
client.setData().forPath(ZK_PATH, data2.getBytes());
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
}
private static void removeNodeData(CuratorFramework client) throws Exception {
print("delete", "/zktest/dddd");
client.delete().forPath("/zktest/dddd");
print("ls", "/");
print(client.getChildren().forPath("/"));
}
private static void nodeListen(CuratorFramework client) throws Exception {
final NodeCache cache = new NodeCache(client,ZK_PATH);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] res = cache.getCurrentData().getData();
System.out.println("data: " + new String(res));
}
});
}
private static void childNodeListen(CuratorFramework client) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client,"/zktest",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("add:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("update:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("remove:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
default:
break;
}
}
});
}
private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
System.out.println(text.toString());
}
private static void print(Object result) {
System.out.println(
result instanceof byte[]
? new String((byte[]) result)
: result);
}
}

View File

@@ -0,0 +1,121 @@
package com.thebeastshop.liteflow.test.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;
public class CuratorTest2 {
/** Zookeeper info */
private static final String ZK_ADDRESS = "114.55.174.189:2181";
private static final String ZK_PATH = "/zktest/ffff";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
// removeNodeData(client);
// createNode(client);
// nodeListen(client);
//
modifyNodeData(client);
}
private static void createNode(CuratorFramework client) throws Exception {
String data1 = "hello";
print("create", ZK_PATH, data1);
client.create().
creatingParentsIfNeeded().
forPath(ZK_PATH, data1.getBytes());
}
private static void getNodeData(CuratorFramework client) throws Exception {
print("ls", "/");
print(client.getChildren().forPath("/"));
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
}
private static void modifyNodeData(CuratorFramework client) throws Exception {
String data2 = "world for u";
print("set", ZK_PATH, data2);
client.setData().forPath(ZK_PATH, data2.getBytes());
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
}
private static void removeNodeData(CuratorFramework client) throws Exception {
print("delete", "/zktest/dddd");
client.delete().forPath("/zktest/dddd");
print("ls", "/");
print(client.getChildren().forPath("/"));
}
private static void nodeListen(CuratorFramework client) throws Exception {
final NodeCache cache = new NodeCache(client,ZK_PATH);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
byte[] res = cache.getCurrentData().getData();
System.out.println("data: " + new String(res));
}
});
}
private static void childNodeListen(CuratorFramework client) throws Exception {
final PathChildrenCache cache = new PathChildrenCache(client,"/zktest",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("add:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
case CHILD_UPDATED:
System.out.println("update:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
case CHILD_REMOVED:
System.out.println("remove:" + event.getData().getPath() + ":" + new String(event.getData().getData()));
break;
default:
break;
}
}
});
}
private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
System.out.println(text.toString());
}
private static void print(Object result) {
System.out.println(
result instanceof byte[]
? new String((byte[]) result)
: result);
}
}

View File

@@ -0,0 +1,23 @@
package com.thebeastshop.liteflow.test.regex;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class RegexTest {
public static void main(String[] args) {
String str = "192.168.1.1:2181,192.168.1.2:2182,192.168.1.3:2183";
List<String> list = new ArrayList<String>();
Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*");
Matcher m = p.matcher(str);
while(m.find()){
list.add(m.group());
}
System.out.println(list.size());
System.out.println(list);
}
}

View File

@@ -10,11 +10,21 @@
<context:component-scan base-package="com.thebeastshop.liteflow.test.component" />
<bean class="com.thebeastshop.liteflow.spring.ComponentScaner"/>
<bean id="flowExecutor" class="com.thebeastshop.liteflow.core.FlowExecutor">
<!-- <bean id="flowExecutor" class="com.thebeastshop.liteflow.core.FlowExecutor">
<property name="rulePath">
<list>
<value>config/flow.xml</value>
</list>
</property>
</bean> -->
<!-- 这种是zk方式配置 -->
<bean id="flowExecutor" class="com.thebeastshop.liteflow.core.FlowExecutor">
<property name="rulePath">
<list>
<value>123.206.92.144:2181,123.206.92.144:2182,123.206.92.144:2183</value>
</list>
</property>
<property name="zkNode" value="/lite-flow/customFlow"/><!-- 这个不配置就用默认的/lite-flow/flow节点 -->
</bean>
</beans>