diff --git a/README.md b/README.md index 93dc1efb3..a7d196635 100644 --- a/README.md +++ b/README.md @@ -1,63 +1,16 @@ -### 概述 -liteFlow是一个轻量级的组件式流程框架,帮助解耦业务代码,让每一个业务片段都是一个组件 +## 概述 +liteFlow是一个轻量,快速的组件式流程引擎框架,帮助解耦业务代码,让每一个业务片段都是一个组件,并支持热加载规则配置,实现即时修改。 -* 提供本地xml的流程配置(后续全面支持spring式流程配置) -* 提供基于spring的扫描方式注入component +[中文文档](http://yomahub.com/liteflow) + +## 特性 +* 提供本地xml的流程配置 +* 支持zookeeper流程配置,即时推送修改内容 +* 能自由扩展配置源,提供扩展接口 +* 和spring集成,支持spring的扫描方式 * 提供串行和并行2种模式。 -* 提供条件节点的模式。 * 消除组件之间参数传递,引入数据总线概念。 -* 自带简单的监控,能够知道每个组件的运行平均时间。消耗内存。(每隔10分钟会自动打印) +* 数据槽高并发隔离机制。 +* 提供无级嵌套条件节点的模式。 +* 自带简单的监控,能够知道每个组件的运行耗时排行 -### Quick Start -1. 定义组件需继承Component,项目启动时会被自动发现。 -2. 定义xml配置(例子) -```xml - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -``` -3.spring里声明执行器 -```xml - - - - flow.xml - - - - - - - -``` -4.开始一个流程 -```java -executor.execute("chain2", 参数); -``` \ No newline at end of file diff --git a/_config.yml b/_config.yml new file mode 100644 index 000000000..c4192631f --- /dev/null +++ b/_config.yml @@ -0,0 +1 @@ +theme: jekyll-theme-cayman \ No newline at end of file diff --git a/docs/guide_cn.md b/docs/guide_cn.md new file mode 100644 index 000000000..def955586 --- /dev/null +++ b/docs/guide_cn.md @@ -0,0 +1,270 @@ +# 一、快速开始 +liteflow需要你的项目使用maven +## 1.1依赖 +```xml + + com.thebeastshop.liteflow + liteflow + ${liteFlow.version} + +``` +最新版本为**2.0.1**   +稳定版本为**1.3.1** +## 1.2流程配置文件 +```xml + + + + + + + + + + + + + + + +``` + +component为组件,这里你需要实现这些组件,每个组件继承`NodeComponent`类 +```java +public class AComponent extends NodeComponent { + + @Override + public void process() { + String str = this.getSlot().getRequestData(); + System.out.println(str); + System.out.println("Acomponent executed!"); + } +} +``` + +chain为流程链,每个链上可配置多个组件节点。目前执行的模式分串行和并行2种。 +串行标签为`then`,并行标签为`when`。 +在串行的模式下,以下2种写法是等价的,可以根据业务需要来把不同种类的节点放一行里。 +```xml + +``` +```xml + + +``` + +## 1.3执行流程链 +```java +FlowExecutor executor = new FlowExecutor(); +executor.setRulePath(Arrays.asList(new String[]{"/config/flow.xml"})); +executor.init(); +Slot slot = executor.execute("demoChain", "arg"); +``` + +如果你的项目使用spring,推荐参考[和Spring进行集成](http://yomahub.com/liteflow/#/?id=%e4%ba%8c%e3%80%81%e5%92%8cspring%e8%bf%9b%e8%a1%8c%e9%9b%86%e6%88%90) + +# 二、和spring进行集成 +## 2.1流程配置可以省略的部分 +流程配置中的`nodes`节点,可以不用配置了,支持spring的自动扫描方式。你需要在你的spring配置文件中定义 +```xml + + +``` + +当然,你的组件节点也需要注册进spirng容器 +```java +@Component("a") +public class AComponent extends NodeComponent + @Override + public void process() { + String str = this.getSlot().getRequestData(); + System.out.println(str); + System.out.println("Acomponent executed!"); + } +} +``` + +## 2.2spring中执行器的配置 +```xml + + + + /config/flow.xml + + + +``` +然后你的项目中通过spring拿到执行器进行调用流程。 + +# 三、和zookeeper进行集成 +## 3.1spring配置 +liteFlow支持把配置放在zk集群中,并支持实时修改流程 +你只需在原来配置执行器的地方,把本地xml路径换成zk地址就ok了 +```xml + + + + + 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 + + + + + +``` + +如果你不加zkNode这个标签,就用默认的节点路径进行读取配置。 +使用这种方式加载配置,在zk上进行更改配置。liteFlow会实时刷新配置。 + +# 四、使用自定义的配置源 +## 4.1创建自定义配置源的类 +如果你不想用本地的配置,也不打算使用zk作为配置持久化工具。liteFlow支持自定义的配置源的扩展点。 +在你的项目中创建一个类继承`ClassXmlFlowParser`这个类 +```java +public class TestCustomParser extends ClassXmlFlowParser { + + @Override + public String parseCustom() { + System.out.println("进入自定义parser"); + String xmlContent = null; + //这里需要自己扩展从自定义的地方获取配置 + return xmlContent; + } +} +``` + +## 4.2Spring配置 +spring中需要改的地方还是执行器的配置,只需要在配置的路径地方放入自定义类的类路径即可 +```xml + + + + com.thebeastshop.liteflow.test.TestCustomParser + + + +``` + +# 五、架构设计 +## 5.1组件式流程引擎架构设计 +![architecture_image](https://raw.githubusercontent.com/thebeastshop/liteFlow/master/docs/images/architecture.png) +Handler Unit:我们想象成每一个业务都是一个业务组件,每一个业务组件就是一个handlerUnit(处理单元) +EPU:这里的epu对应的就是我们的执行器,用来统筹并处理handlerUnit。相当于计算机的CPU +Event Bus:事件总线,用来指定下一个命令是什么,该如何去执行处理单元。这里的时间总线由我们的配置构成 +Data Bus:数据总线,用来存储整个调用链里数据。每一个请求生成一个数据槽。一个数据里最多有1024个数据槽 + +# 六、接入详细指南 +## 6.1执行器 +执行器`FlowExecutor`用来执行一个流程,用法为 +```java +public T execute(String chainId,Object param); +``` +第一个参数为流程ID,第二个参数为流程入参 +返回为`Slot`接口的子类,以上方法所返回的为默认的实现类`DefaultSlot` + +!> 实际在使用时,并不推荐用默认的`DefaultSlot`,推荐自己新建一个类继承`AbsSlot`类 +这是因为默认Slot实现类里面大多数都存放元数据,给用户扩展的数据存储是一个弱类型的Map +而用户自定义的Slot可以实现强类型的数据,这样对开发者更加友好 + +推荐使用带自定义Slot的执行接口: +```java +public T execute(String chainId,Object param,Class slotClazz); +``` + +关于`Slot`的说明,请参照[数据槽](http://yomahub.com/liteflow/#/?id=_62%e6%95%b0%e6%8d%ae%e6%a7%bd) + +## 6.2数据槽 +在执行器执行流程时会分配唯一的一个数据槽给这个请求。不同请求的数据槽是完全隔离的。 +数据槽实际上是一个Map,里面存放着liteFlow的元数据 +比如可以通过`getRequestData`获得流程的初始参数,通过`getChainName`获取流程的命名,通过`setInput`,`getInput`,`setOutput`,`getOutput`设置和获取属于某个组件专有的数据对象。当然也提供了最通用的方法`setData`和`getData`用来设置和获取业务的数据。 + +!> 不过这里还是推荐扩展出自定义的Slot(上一小章阐述了原因),自定义的Slot更加友好。更加贴合业务。 + +## 6.3组件节点 +组件节点需要继承`NodeComponent`类 +需要实现`process`方法 +但是推荐实现`isAccess`方法,表示是否进入该节点,可以用于业务参数的预先判断 + +其他几个可以覆盖的方法有: +方法`isContinueOnError`:表示出错是否继续往下执行下一个组件 +方法`isEnd`:表示是否立即结束整个流程 + +在组件节点里,随时可以通过方法`getSlot`获取当前的数据槽,从而可以获取任何数据。 + +## 6.4条件节点 +在实际业务中,往往要通过动态的业务逻辑判断到底接下去该执行哪一个节点 +```xml + + + + +``` +利用表达式可以很方便的进行条件的判断 +c节点是用来路由的,被称为条件节点,这种节点需要继承`NodeCondComponent`类 +需要实现方法`processCond`,这个方法需要返回`Class`类型,就是具体的结果节点 + +## 6.5嵌套执行 +liteFlow可以无极嵌套执行n条流程 +```java +@Component("h") +public class HComponent extends NodeComponent { + + @Resource + private FlowExecutor flowExecutor; + + @Override + public void process() { + System.out.println("Hcomponent executed!"); + flowExecutor.invoke("strategy1",3, DefaultSlot.class, this.getSlotIndex()); + } + +} +``` +这段代码演示了在某个业务节点内调用另外一个流程链的方法 + +## 6.6步骤打印 +liteFlow在执行每一条流程链后会打印步骤 +样例如下: +``` +a==>c==>h(START)==>m==>p==>p1==>h(END)==>g +``` +?> 其中h节点分start和end两个步骤,说明在h节点内调用了另一条流程。start和end之间的步骤就是另一条流程的步骤 + +## 6.7监控 +liteFlow提供了简单的监控,目前只统计一个指标:每个组件的平均耗时 +每5分钟会打印一次,并且是根据耗时时长倒序排的。 + +# 七、未来版本计划 +## 2.5版本 +* 支持更多的表达式,重写表达式解析器 +* 重新设计数据总线,解决数据槽热点问题 +* 增加一种驱动模式:消息驱动的模式 +* 对spring进行标签级支持 +* 对组件侵入更低,支持标注级声明 +* 增加监控的数据类型 + +## 2.6版本 +* 提供一个简单的组件注册中心 +* 有UI界面来查看监控数据 +* 此版本的重点功能:能用UI界面来回放整个执行过程(精确到数据槽里每一个对象) +* 此版本的重点功能:界面式设计规则 + +## 3.0版本 +主要是规则引擎的进化,制定规则文件。完善表达式引擎。 + +# 八、更新记录 +## 1.3.1更新日志 +优化大量潜在的问题,此版本为稳定版本,主要更新点如下: +* 增加条件节点功能 +* 优化异常捕获的日志打印 +* 支持自定义SLOT的特性 +* 优化步骤打印,能够支持开闭区间的打印方式 +* 增加了内部策略的调用方式 +* 增加了追踪ID +* 优化了监控打印 + +## 2.0.1更新日志 +更新点如下: +* 增加对zookeeper的支持 +* 增加自定义配置源 +* 优化监控的表现 diff --git a/docs/images/architecture.png b/docs/images/architecture.png new file mode 100644 index 000000000..e0cf70d4c Binary files /dev/null and b/docs/images/architecture.png differ diff --git a/pom.xml b/pom.xml index 993677321..e09a9a044 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ liteflow jar 4.0.0 - 1.3.1 + 2.0.1 UTF-8 @@ -21,6 +21,7 @@ 1.7.13 1.2.7 1.6.1 + 2.11.1 4.12 @@ -33,7 +34,7 @@ org.apache.commons commons-collections4 - 4.1 + ${commons-collections.version} commons-io @@ -100,6 +101,11 @@ commons-logging ${commons-logging.version} + + org.apache.curator + curator-recipes + ${curator.version} + diff --git a/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java b/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java index 420c477b6..aab712d65 100644 --- a/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java +++ b/src/main/java/com/thebeastshop/liteflow/core/FlowExecutor.java @@ -13,6 +13,8 @@ import java.text.MessageFormat; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -27,12 +29,15 @@ import com.thebeastshop.liteflow.entity.data.DataBus; import com.thebeastshop.liteflow.entity.data.DefaultSlot; import com.thebeastshop.liteflow.entity.data.Slot; import com.thebeastshop.liteflow.exception.ChainNotFoundException; -import com.thebeastshop.liteflow.exception.ComponentNotAccessException; import com.thebeastshop.liteflow.exception.FlowExecutorNotInitException; import com.thebeastshop.liteflow.exception.FlowSystemException; import com.thebeastshop.liteflow.exception.NoAvailableSlotException; +import com.thebeastshop.liteflow.exception.ParseException; import com.thebeastshop.liteflow.flow.FlowBus; -import com.thebeastshop.liteflow.parser.FlowParser; +import com.thebeastshop.liteflow.parser.LocalXmlFlowParser; +import com.thebeastshop.liteflow.parser.XmlFlowParser; +import com.thebeastshop.liteflow.parser.ZookeeperXmlFlowParser; +import com.thebeastshop.liteflow.util.LOGOPrinter; public class FlowExecutor { @@ -40,17 +45,51 @@ public class FlowExecutor { private List rulePath; + private String zkNode; + public void init() { + XmlFlowParser parser = null; for(String path : rulePath){ try { - FlowParser.parseLocal(path); + if(isLocalConfig(path)) { + parser = new LocalXmlFlowParser(); + }else if(isZKConfig(path)){ + if(StringUtils.isNotBlank(zkNode)) { + parser = new ZookeeperXmlFlowParser(zkNode); + }else { + parser = new ZookeeperXmlFlowParser(); + } + }else if(isClassConfig(path)) { + Class c = Class.forName(path); + parser = (XmlFlowParser)c.newInstance(); + } + parser.parseMain(path); } catch (Exception e) { String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path); + LOG.error(errorMsg,e); throw new FlowExecutorNotInitException(errorMsg); } } } + private boolean isZKConfig(String path) { + Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*"); + Matcher m = p.matcher(path); + return m.find(); + } + + private boolean isLocalConfig(String path) { + Pattern p = Pattern.compile("^[\\w\\/]+(\\/\\w+)*\\.xml$"); + Matcher m = p.matcher(path); + return m.find(); + } + + private boolean isClassConfig(String path) { + Pattern p = Pattern.compile("^\\w+(\\.\\w+)*$"); + Matcher m = p.matcher(path); + return m.find(); + } + public void reloadRule(){ init(); } @@ -200,4 +239,12 @@ public class FlowExecutor { public void setRulePath(List rulePath) { this.rulePath = rulePath; } + + public String getZkNode() { + return zkNode; + } + + public void setZkNode(String zkNode) { + this.zkNode = zkNode; + } } diff --git a/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java b/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java index 0f77eb276..46c1f3882 100644 --- a/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java +++ b/src/main/java/com/thebeastshop/liteflow/core/NodeComponent.java @@ -20,8 +20,8 @@ import com.thebeastshop.liteflow.entity.data.CmpStepType; import com.thebeastshop.liteflow.entity.data.DataBus; import com.thebeastshop.liteflow.entity.data.Slot; import com.thebeastshop.liteflow.entity.monitor.CompStatistics; +import com.thebeastshop.liteflow.flow.FlowBus; import com.thebeastshop.liteflow.monitor.MonitorBus; -import com.thebeastshop.liteflow.parser.FlowParser; public abstract class NodeComponent { @@ -37,12 +37,11 @@ public abstract class NodeComponent { slot.addStep(new CmpStep(nodeId, CmpStepType.START)); StopWatch stopWatch = new StopWatch(); stopWatch.start(); - long initm=Runtime.getRuntime().freeMemory(); process(); + stopWatch.stop(); long timeSpent = stopWatch.getTime(); - long endm=Runtime.getRuntime().freeMemory(); slot.addStep(new CmpStep(nodeId, CmpStepType.END)); @@ -56,7 +55,7 @@ public abstract class NodeComponent { if(this instanceof NodeCondComponent){ String condNodeId = slot.getCondResult(this.getClass().getName()); if(StringUtils.isNotBlank(condNodeId)){ - Node thisNode = FlowParser.getNode(nodeId); + Node thisNode = FlowBus.getNode(nodeId); Node condNode = thisNode.getCondNode(condNodeId); if(condNode != null){ NodeComponent condComponent = condNode.getInstance(); diff --git a/src/main/java/com/thebeastshop/liteflow/exception/ParseException.java b/src/main/java/com/thebeastshop/liteflow/exception/ParseException.java new file mode 100644 index 000000000..99917bb13 --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/exception/ParseException.java @@ -0,0 +1,21 @@ +package com.thebeastshop.liteflow.exception; + +public class ParseException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** 异常信息 */ + private String message; + + public ParseException(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/src/main/java/com/thebeastshop/liteflow/flow/FlowBus.java b/src/main/java/com/thebeastshop/liteflow/flow/FlowBus.java index d8130c33f..ce799e588 100644 --- a/src/main/java/com/thebeastshop/liteflow/flow/FlowBus.java +++ b/src/main/java/com/thebeastshop/liteflow/flow/FlowBus.java @@ -15,11 +15,14 @@ import java.util.Map; import org.apache.commons.collections4.MapUtils; import com.thebeastshop.liteflow.entity.config.Chain; +import com.thebeastshop.liteflow.entity.config.Node; public class FlowBus { private static Map chainMap; + private static Map nodeMap; + public static Chain getChain(String id) throws Exception{ if(chainMap == null || chainMap.isEmpty()){ throw new Exception("please config the rule first"); @@ -37,4 +40,15 @@ public class FlowBus { public static boolean needInit() { return MapUtils.isEmpty(chainMap); } + + public static void addNode(String nodeId, Node node) { + if(nodeMap == null) { + nodeMap = new HashMap(); + } + nodeMap.put(nodeId, node); + } + + public static Node getNode(String nodeId) { + return nodeMap.get(nodeId); + } } diff --git a/src/main/java/com/thebeastshop/liteflow/parser/ClassXmlFlowParser.java b/src/main/java/com/thebeastshop/liteflow/parser/ClassXmlFlowParser.java new file mode 100644 index 000000000..692b47e03 --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/parser/ClassXmlFlowParser.java @@ -0,0 +1,11 @@ +package com.thebeastshop.liteflow.parser; + +public abstract class ClassXmlFlowParser extends XmlFlowParser { + @Override + public void parseMain(String path) throws Exception { + String content = parseCustom(); + parse(content); + } + + public abstract String parseCustom(); +} diff --git a/src/main/java/com/thebeastshop/liteflow/parser/LocalXmlFlowParser.java b/src/main/java/com/thebeastshop/liteflow/parser/LocalXmlFlowParser.java new file mode 100644 index 000000000..a8a929bf0 --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/parser/LocalXmlFlowParser.java @@ -0,0 +1,22 @@ +/** + *

Title: liteFlow

+ *

Description: 轻量级的组件式流程框架

+ *

Copyright: Copyright (c) 2017

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2017-7-28 + * @version 1.0 + */ +package com.thebeastshop.liteflow.parser; + +import com.thebeastshop.liteflow.util.IOUtil; + +public class LocalXmlFlowParser extends XmlFlowParser{ + + private final String ENCODING_FORMAT = "UTF-8"; + + public void parseMain(String rulePath) throws Exception { + String ruleContent = IOUtil.read(rulePath, ENCODING_FORMAT); + parse(ruleContent); + } +} diff --git a/src/main/java/com/thebeastshop/liteflow/parser/RegexEntity.java b/src/main/java/com/thebeastshop/liteflow/parser/RegexEntity.java new file mode 100644 index 000000000..55ffd84ee --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/parser/RegexEntity.java @@ -0,0 +1,32 @@ +package com.thebeastshop.liteflow.parser; + +import java.util.Arrays; + +public class RegexEntity { + + private String condNode; + + private String[] realNodeArray; + + public String getCondNode() { + return condNode; + } + + public void setCondNode(String condNode) { + this.condNode = condNode; + } + + public String[] getRealNodeArray() { + return realNodeArray; + } + + public void setRealNodeArray(String[] realNodeArray) { + this.realNodeArray = realNodeArray; + } + + @Override + public String toString() { + return "RegexEntity [condNode=" + condNode + ", realNodeArray=" + + Arrays.toString(realNodeArray) + "]"; + } +} diff --git a/src/main/java/com/thebeastshop/liteflow/parser/TempConvert.java b/src/main/java/com/thebeastshop/liteflow/parser/TempConvert.java deleted file mode 100644 index b8f4818a6..000000000 --- a/src/main/java/com/thebeastshop/liteflow/parser/TempConvert.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.thebeastshop.liteflow.parser; - -import org.springframework.stereotype.Component; - -import java.util.ArrayList; -import java.util.List; -import java.util.Stack; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * 类型转换 - * @author gongjun[jun.gong@thebeastshop.com] - * @since 2017-11-22 15:13 - */ -@Component -public class TempConvert { - - - - private static List match(String input) { - List list = new ArrayList(); - Stack stack = new Stack<>(); - StringBuffer buffer = new StringBuffer(); - for (int i = 0; i < input.length(); i++) { - char c = input.charAt(i); - if (c == '(') { - stack.push(c); - if (stack.size() == 1 && buffer.length() > 0) { - list.add(buffer.toString()); - buffer = new StringBuffer(); - }else { - buffer.append(c); - } - }else if (c == ')') { - if (stack.size() > 0) { - stack.pop(); - if (stack.size() == 0) { - if (buffer.length() > 0) { - list.add(buffer.toString()); - buffer = new StringBuffer(); - } - }else { - buffer.append(c); - } - } - }else { - buffer.append(c); - } - } - if (buffer.length() > 0) { - list.add(buffer.toString()); - } - return list; - } - - - public static void main(String[] args) { - List list = new ArrayList(); - String input = "aaaa(bbb(xxxxx|yyyy))"; - list = match(input); - System.out.println(list); - } -} diff --git a/src/main/java/com/thebeastshop/liteflow/parser/FlowParser.java b/src/main/java/com/thebeastshop/liteflow/parser/XmlFlowParser.java similarity index 67% rename from src/main/java/com/thebeastshop/liteflow/parser/FlowParser.java rename to src/main/java/com/thebeastshop/liteflow/parser/XmlFlowParser.java index 6537e376c..ef84df3aa 100644 --- a/src/main/java/com/thebeastshop/liteflow/parser/FlowParser.java +++ b/src/main/java/com/thebeastshop/liteflow/parser/XmlFlowParser.java @@ -1,20 +1,8 @@ -/** - *

Title: liteFlow

- *

Description: 轻量级的组件式流程框架

- *

Copyright: Copyright (c) 2017

- * @author Bryan.Zhang - * @email weenyc31@163.com - * @Date 2017-7-28 - * @version 1.0 - */ package com.thebeastshop.liteflow.parser; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -34,28 +22,20 @@ import com.thebeastshop.liteflow.entity.config.WhenCondition; import com.thebeastshop.liteflow.flow.FlowBus; import com.thebeastshop.liteflow.spring.ComponentScaner; import com.thebeastshop.liteflow.util.Dom4JReader; -import com.thebeastshop.liteflow.util.IOUtil; -@SuppressWarnings("unchecked") -public class FlowParser { - - private static final Logger LOG = LoggerFactory.getLogger(FlowParser.class); - - private static final String ENCODING_FORMAT = "UTF-8"; +public abstract class XmlFlowParser { - private static Map nodeMap = new HashMap(); - - public static void parseLocal(String rulePath) throws Exception { - String ruleContent = IOUtil.read(rulePath, ENCODING_FORMAT); - parse(ruleContent); - } - - public static void parse(String content) throws Exception { + private final Logger LOG = LoggerFactory.getLogger(XmlFlowParser.class); + + public abstract void parseMain(String path) throws Exception; + + public void parse(String content) throws Exception { Document document = Dom4JReader.getFormatDocument(content); parse(document); } - public static void parse(Document document) throws Exception { + @SuppressWarnings("unchecked") + public void parse(Document document) throws Exception { try { Element rootElement = document.getRootElement(); @@ -79,11 +59,11 @@ public class FlowParser { } component.setNodeId(id); node.setInstance(component); - nodeMap.put(id, node); + FlowBus.addNode(id, node); } }else{ for(Entry componentEntry : ComponentScaner.nodeComponentMap.entrySet()){ - nodeMap.put(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); + FlowBus.addNode(componentEntry.getKey(), new Node(componentEntry.getKey(), componentEntry.getValue().getClass().getName(), componentEntry.getValue())); } } @@ -110,11 +90,11 @@ public class FlowParser { Node node = null; for (int i = 0; i < condArray.length; i++) { regexEntity = parseNodeStr(condArray[i].trim()); - node = nodeMap.get(regexEntity.getCondNode()); + node = FlowBus.getNode(regexEntity.getCondNode()); chainNodeList.add(node); if(regexEntity.getRealNodeArray() != null){ for(String key : regexEntity.getRealNodeArray()){ - Node condNode = nodeMap.get(key); + Node condNode = FlowBus.getNode(key); if(condNode != null){ node.setCondNode(condNode.getId(), condNode); } @@ -135,39 +115,6 @@ public class FlowParser { } - public static Node getNode(String nodeId){ - return nodeMap.get(nodeId); - } - - private static class RegexEntity{ - - private String condNode; - - private String[] realNodeArray; - - public String getCondNode() { - return condNode; - } - - public void setCondNode(String condNode) { - this.condNode = condNode; - } - - public String[] getRealNodeArray() { - return realNodeArray; - } - - public void setRealNodeArray(String[] realNodeArray) { - this.realNodeArray = realNodeArray; - } - - @Override - public String toString() { - return "RegexEntity [condNode=" + condNode + ", realNodeArray=" - + Arrays.toString(realNodeArray) + "]"; - } - } - public static RegexEntity parseNodeStr(String str) { List list = new ArrayList(); Pattern p = Pattern.compile("[^\\)\\(]+"); @@ -186,8 +133,4 @@ public class FlowParser { } return regexEntity; } - - public static void main(String[] args) { - System.out.println(parseNodeStr("aaaa ( xxxx | yyyy | vvvv )")); - } } diff --git a/src/main/java/com/thebeastshop/liteflow/parser/ZookeeperXmlFlowParser.java b/src/main/java/com/thebeastshop/liteflow/parser/ZookeeperXmlFlowParser.java new file mode 100644 index 000000000..047aca383 --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/parser/ZookeeperXmlFlowParser.java @@ -0,0 +1,63 @@ +package com.thebeastshop.liteflow.parser; + +import java.text.MessageFormat; + +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.retry.RetryNTimes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.thebeastshop.liteflow.exception.ParseException; + +public class ZookeeperXmlFlowParser extends XmlFlowParser{ + + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperXmlFlowParser.class); + + private String nodePath = "/lite-flow/flow"; + + public ZookeeperXmlFlowParser() { + + } + + public ZookeeperXmlFlowParser(String node) { + nodePath = node; + } + + @Override + public void parseMain(String path) throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient( + path, + new RetryNTimes(10, 5000) + ); + client.start(); + + if (client.checkExists().forPath(nodePath) == null) { + client.create().creatingParentsIfNeeded().forPath(nodePath, "".getBytes()); + } + + String content = new String(client.getData().forPath(nodePath)); + + + if(StringUtils.isBlank(content)) { + String error = MessageFormat.format("the node[{0}] value is empty", nodePath); + throw new ParseException(error); + } + parse(content); + + + final NodeCache cache = new NodeCache(client,nodePath); + cache.start(); + + cache.getListenable().addListener(new NodeCacheListener() { + @Override + public void nodeChanged() throws Exception { + String content = new String(cache.getCurrentData().getData()); + LOG.info("stating load flow config...."); + parse(content); + } + }); + } +} diff --git a/src/main/java/com/thebeastshop/liteflow/spring/ComponentScaner.java b/src/main/java/com/thebeastshop/liteflow/spring/ComponentScaner.java index 81c74891a..60c3993f1 100644 --- a/src/main/java/com/thebeastshop/liteflow/spring/ComponentScaner.java +++ b/src/main/java/com/thebeastshop/liteflow/spring/ComponentScaner.java @@ -19,6 +19,7 @@ import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; import com.thebeastshop.liteflow.core.NodeComponent; import com.thebeastshop.liteflow.entity.config.Node; +import com.thebeastshop.liteflow.util.LOGOPrinter; public class ComponentScaner implements BeanPostProcessor, PriorityOrdered { @@ -26,6 +27,10 @@ public class ComponentScaner implements BeanPostProcessor, PriorityOrdered { public static Map nodeComponentMap = new HashMap(); + static { + LOGOPrinter.print(); + } + @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; diff --git a/src/main/java/com/thebeastshop/liteflow/util/LOGOPrinter.java b/src/main/java/com/thebeastshop/liteflow/util/LOGOPrinter.java new file mode 100644 index 000000000..1bc27212f --- /dev/null +++ b/src/main/java/com/thebeastshop/liteflow/util/LOGOPrinter.java @@ -0,0 +1,23 @@ +package com.thebeastshop.liteflow.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LOGOPrinter { + + private static final Logger LOG = LoggerFactory.getLogger(LOGOPrinter.class); + + public static void print() { + StringBuilder str = new StringBuilder("\n"); + str.append("================================================================================================\n"); + str.append(" _ ___ _____ _____ _____ _ _____ __\n"); + str.append(" | | |_ _|_ _| ____| | ___| | / _ \\ \\ / /\n"); + str.append(" | | | | | | | _| _____| |_ | | | | | \\ \\ /\\ / / \n"); + str.append(" | |___ | | | | | |__|_____| _| | |__| |_| |\\ V V / \n"); + str.append(" |_____|___| |_| |_____| |_| |_____\\___/ \\_/\\_/ \n\n"); + str.append(" 做最轻量级,最实用的微流程框架\n"); + str.append(" To be the most lightweight and the most practical micro-process framework\n"); + str.append("================================================================================================\n"); + LOG.info(str.toString()); + } +} diff --git a/src/test/java/com/thebeastshop/liteflow/test/TestCustomParser.java b/src/test/java/com/thebeastshop/liteflow/test/TestCustomParser.java new file mode 100644 index 000000000..652a90666 --- /dev/null +++ b/src/test/java/com/thebeastshop/liteflow/test/TestCustomParser.java @@ -0,0 +1,13 @@ +package com.thebeastshop.liteflow.test; + +import com.thebeastshop.liteflow.parser.ClassXmlFlowParser; + +public class TestCustomParser extends ClassXmlFlowParser { + + @Override + public String parseCustom() { + System.out.println("进入自定义parser,这里只做进入作用,不返回具体xml"); + return null; + } + +} diff --git a/src/test/java/com/thebeastshop/liteflow/test/TestWithSpringMain.java b/src/test/java/com/thebeastshop/liteflow/test/TestWithSpringMain.java index a5e7f777e..61897ce48 100644 --- a/src/test/java/com/thebeastshop/liteflow/test/TestWithSpringMain.java +++ b/src/test/java/com/thebeastshop/liteflow/test/TestWithSpringMain.java @@ -19,7 +19,7 @@ import com.thebeastshop.liteflow.entity.data.Slot; @ContextConfiguration(locations = { "classpath:spring-test.xml" }) public class TestWithSpringMain { - @Resource + @Resource(name="flowExecutor") private FlowExecutor flowExecutor; @Test @@ -50,4 +50,17 @@ public class TestWithSpringMain { } } + + @Test + public void test3() throws Exception { + try { + while(true) { + Slot slot = flowExecutor.execute("chain3", "it's a request"); + Thread.sleep(2000); + } + }catch(Exception e) { + e.printStackTrace(); + } + + } } diff --git a/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest.java b/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest.java new file mode 100644 index 000000000..3d6989dcc --- /dev/null +++ b/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest.java @@ -0,0 +1,135 @@ +package com.thebeastshop.liteflow.test.curator; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.RetryNTimes; + +public class CuratorTest { + + /** Zookeeper info */ + private static final String ZK_ADDRESS = "123.206.92.144:2181,123.206.92.144:2182,123.206.92.144:2183"; + private static final String ZK_PATH = "/zktest/a1/aa1"; + + public static void main(String[] args) throws Exception { + // 1.Connect to zk + CuratorFramework client = CuratorFrameworkFactory.newClient( + ZK_ADDRESS, + new RetryNTimes(10, 5000) + ); + client.start(); + + checkNode(client); + +// childNodeListen(client); + +// removeNodeData(client); + +// createNode(client); + +// nodeListen(client); +// +// modifyNodeData(client); + + System.in.read(); + +// getNodeData(client); +// +// + + } + + private static void checkNode(CuratorFramework client) throws Exception { + System.out.println(client.checkExists().forPath("/test")); + } + + private static void createNode(CuratorFramework client) throws Exception { + String data1 = "nice to meet you"; + print("create", ZK_PATH, data1); + client.create(). + creatingParentsIfNeeded(). + forPath(ZK_PATH, data1.getBytes()); + } + + private static void getNodeData(CuratorFramework client) throws Exception { + print("ls", "/"); + print(client.getChildren().forPath("/")); + print("get", ZK_PATH); + print(client.getData().forPath(ZK_PATH)); + } + + private static void modifyNodeData(CuratorFramework client) throws Exception { + String data2 = "world for u"; + print("set", ZK_PATH, data2); + client.setData().forPath(ZK_PATH, data2.getBytes()); + print("get", ZK_PATH); + print(client.getData().forPath(ZK_PATH)); + } + + private static void removeNodeData(CuratorFramework client) throws Exception { + print("delete", "/zktest/dddd"); + client.delete().forPath("/zktest/dddd"); + print("ls", "/"); + print(client.getChildren().forPath("/")); + } + + private static void nodeListen(CuratorFramework client) throws Exception { + final NodeCache cache = new NodeCache(client,ZK_PATH); + cache.start(); + + cache.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() throws Exception { + byte[] res = cache.getCurrentData().getData(); + System.out.println("data: " + new String(res)); + } + }); + } + + private static void childNodeListen(CuratorFramework client) throws Exception { + final PathChildrenCache cache = new PathChildrenCache(client,"/zktest",true); + cache.start(); + + cache.getListenable().addListener(new PathChildrenCacheListener() { + + @Override + public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: + System.out.println("add:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + case CHILD_UPDATED: + System.out.println("update:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + case CHILD_REMOVED: + System.out.println("remove:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + default: + break; + } + } + }); + } + + + private static void print(String... cmds) { + StringBuilder text = new StringBuilder("$ "); + for (String cmd : cmds) { + text.append(cmd).append(" "); + } + System.out.println(text.toString()); + } + + private static void print(Object result) { + System.out.println( + result instanceof byte[] + ? new String((byte[]) result) + : result); + } + +} diff --git a/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest2.java b/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest2.java new file mode 100644 index 000000000..232ecace1 --- /dev/null +++ b/src/test/java/com/thebeastshop/liteflow/test/curator/CuratorTest2.java @@ -0,0 +1,121 @@ +package com.thebeastshop.liteflow.test.curator; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.RetryNTimes; + +public class CuratorTest2 { + + /** Zookeeper info */ + private static final String ZK_ADDRESS = "114.55.174.189:2181"; + private static final String ZK_PATH = "/zktest/ffff"; + + public static void main(String[] args) throws Exception { + // 1.Connect to zk + CuratorFramework client = CuratorFrameworkFactory.newClient( + ZK_ADDRESS, + new RetryNTimes(10, 5000) + ); + client.start(); + +// removeNodeData(client); + +// createNode(client); + +// nodeListen(client); +// + modifyNodeData(client); + + } + + private static void createNode(CuratorFramework client) throws Exception { + String data1 = "hello"; + print("create", ZK_PATH, data1); + client.create(). + creatingParentsIfNeeded(). + forPath(ZK_PATH, data1.getBytes()); + } + + private static void getNodeData(CuratorFramework client) throws Exception { + print("ls", "/"); + print(client.getChildren().forPath("/")); + print("get", ZK_PATH); + print(client.getData().forPath(ZK_PATH)); + } + + private static void modifyNodeData(CuratorFramework client) throws Exception { + String data2 = "world for u"; + print("set", ZK_PATH, data2); + client.setData().forPath(ZK_PATH, data2.getBytes()); + print("get", ZK_PATH); + print(client.getData().forPath(ZK_PATH)); + } + + private static void removeNodeData(CuratorFramework client) throws Exception { + print("delete", "/zktest/dddd"); + client.delete().forPath("/zktest/dddd"); + print("ls", "/"); + print(client.getChildren().forPath("/")); + } + + private static void nodeListen(CuratorFramework client) throws Exception { + final NodeCache cache = new NodeCache(client,ZK_PATH); + cache.start(); + + cache.getListenable().addListener(new NodeCacheListener() { + + @Override + public void nodeChanged() throws Exception { + byte[] res = cache.getCurrentData().getData(); + System.out.println("data: " + new String(res)); + } + }); + } + + private static void childNodeListen(CuratorFramework client) throws Exception { + final PathChildrenCache cache = new PathChildrenCache(client,"/zktest",true); + cache.start(); + + cache.getListenable().addListener(new PathChildrenCacheListener() { + + @Override + public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: + System.out.println("add:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + case CHILD_UPDATED: + System.out.println("update:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + case CHILD_REMOVED: + System.out.println("remove:" + event.getData().getPath() + ":" + new String(event.getData().getData())); + break; + default: + break; + } + } + }); + } + + + private static void print(String... cmds) { + StringBuilder text = new StringBuilder("$ "); + for (String cmd : cmds) { + text.append(cmd).append(" "); + } + System.out.println(text.toString()); + } + + private static void print(Object result) { + System.out.println( + result instanceof byte[] + ? new String((byte[]) result) + : result); + } + +} diff --git a/src/test/java/com/thebeastshop/liteflow/test/regex/RegexTest.java b/src/test/java/com/thebeastshop/liteflow/test/regex/RegexTest.java new file mode 100644 index 000000000..c1efeebd9 --- /dev/null +++ b/src/test/java/com/thebeastshop/liteflow/test/regex/RegexTest.java @@ -0,0 +1,23 @@ +package com.thebeastshop.liteflow.test.regex; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexTest { + + public static void main(String[] args) { + String str = "192.168.1.1:2181,192.168.1.2:2182,192.168.1.3:2183"; + List list = new ArrayList(); + Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*"); + Matcher m = p.matcher(str); + while(m.find()){ + list.add(m.group()); + } + System.out.println(list.size()); + System.out.println(list); + + } + +} diff --git a/src/test/resources/spring-test.xml b/src/test/resources/spring-test.xml index 685098480..26ff8018a 100644 --- a/src/test/resources/spring-test.xml +++ b/src/test/resources/spring-test.xml @@ -10,6 +10,7 @@ + @@ -17,4 +18,23 @@ + + + + + + \ No newline at end of file