Merge branch 'master' into dev

# Conflicts:
#	liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java
#	liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/subflow/cmp2/FCmp.java
#	liteflow-testcase-springboot/src/test/java/com/yomahub/liteflow/test/subflow/cmp2/HCmp.java
This commit is contained in:
everywhere.z
2022-06-13 18:39:11 +08:00
26 changed files with 233 additions and 86 deletions

View File

@@ -9,7 +9,7 @@
<parent>
<groupId>com.yomahub</groupId>
<artifactId>liteflow</artifactId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<dependencies>

View File

@@ -370,7 +370,6 @@ public class FlowExecutor {
if (ObjectUtil.isNotNull(param)){
slot.setRequestData(param);
}
slot.setChainName(chainId);
} else {
if (ObjectUtil.isNotNull(param)){
slot.setChainReqData(chainId, param);

View File

@@ -296,6 +296,10 @@ public abstract class NodeComponent{
return getSlot().getRequestData();
}
public <T> T getSubChainReqData(){
return getSlot().getChainReqData(this.getChainName());
}
public String getChainName(){
return getSlot().getChainName();
}

View File

@@ -31,15 +31,8 @@ public class Chain implements Executable {
private String chainName;
//主体Condition
private List<Condition> conditionList = new ArrayList<>();
//前置处理Condition用来区别主体的Condition
private List<Condition> preConditionList = new ArrayList<>();
//后置处理Condition用来区别主体的Condition
private List<Condition> finallyConditionList = new ArrayList<>();
public Chain(String chainName){
this.chainName = chainName;
}
@@ -51,57 +44,6 @@ public class Chain implements Executable {
this.conditionList = conditionList;
}
//执行chain的主方法
@Override
public void execute(Integer slotIndex) throws Exception {
if (CollUtil.isEmpty(conditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
}
try {
//执行前置
this.executePre(slotIndex);
//执行主体Condition
for (Condition condition : conditionList) {
condition.execute(slotIndex);
}
}catch (ChainEndException e){
//这里单独catch ChainEndException是因为ChainEndException是用户自己setIsEnd抛出的异常
//是属于正常逻辑所以会在FlowExecutor中判断。这里不作为异常处理
throw e;
}catch (Exception e){
//这里事先取到exception set到slot里为了方便finally取到exception
Slot<?> slot = DataBus.getSlot(slotIndex);
slot.setException(e);
throw e;
}finally {
//执行后置
this.executeFinally(slotIndex);
}
}
// 执行pre节点
private void executePre(Integer slotIndex) throws Exception {
for (Condition condition : this.preConditionList){
condition.execute(slotIndex);
}
}
private void executeFinally(Integer slotIndex) throws Exception {
for (Condition condition : this.finallyConditionList){
condition.execute(slotIndex);
}
}
@Override
public ExecuteTypeEnum getExecuteType() {
return ExecuteTypeEnum.CHAIN;
}
@Override
public String getExecuteName() {
return chainName;
}
public List<Condition> getConditionList() {
return conditionList;
}
@@ -118,19 +60,174 @@ public class Chain implements Executable {
this.chainName = chainName;
}
public List<Condition> getPreConditionList() {
return preConditionList;
//执行chain的主方法
@Override
public void execute(Integer slotIndex) throws Exception {
if (CollUtil.isEmpty(conditionList)) {
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
}
Slot<?> slot = DataBus.getSlot(slotIndex);
try {
//执行前置
this.executePre(slotIndex);
//执行主体Condition
for (Condition condition : conditionList) {
condition.execute(slotIndex);
}
}catch (ChainEndException e){
//这里单独catch ChainEndException是因为ChainEndException是用户自己setIsEnd抛出的异常
//是属于正常逻辑所以会在FlowExecutor中判断。这里不作为异常处理
throw e;
}catch (Exception e){
//这里事先取到exception set到slot里为了方便finally取到exception
slot.setException(e);
throw e;
}finally {
//执行后置
this.executeFinally(slotIndex);
//流程结束后需要把当前的chainName从stack结构中移出
//里面的逻辑判断了当只剩根chainName的时候不移除
slot.popChainName();
}
}
public void setPreConditionList(List<Condition> preConditionList) {
this.preConditionList = preConditionList;
// 执行pre节点
public void executePre(Integer slotIndex) throws Exception {
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_PRE);
}
public List<Condition> getFinallyConditionList() {
return finallyConditionList;
public void executeFinally(Integer slotIndex) throws Exception {
doExecuteForPointConditionType(slotIndex, ConditionTypeEnum.TYPE_FINALLY);
}
public void setFinallyConditionList(List<Condition> finallyConditionList) {
this.finallyConditionList = finallyConditionList;
// 执行指定的conditionType的节点
private void doExecuteForPointConditionType(Integer slotIndex, ConditionTypeEnum conditionTypeEnum) throws Exception {
//先把指定condition类型的节点过滤出来
List<Condition> conditions =filterCondition(conditionTypeEnum);
for (Condition condition : conditions){
for(Executable executableItem : condition.getNodeList()){
executableItem.execute(slotIndex);
}
}
}
// 根据节点condition类型过去出节点列表
private List<Condition> filterCondition(ConditionTypeEnum conditionTypeEnum) {
assert conditionTypeEnum != null;
return conditionList.stream().filter(condition ->
condition.getConditionType().equals(conditionTypeEnum)).collect(Collectors.toList());
}
@Override
public ExecuteTypeEnum getExecuteType() {
return ExecuteTypeEnum.CHAIN;
}
@Override
public String getExecuteName() {
return chainName;
}
//使用线程池执行when并发流程
//这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex) throws Exception{
Slot slot = DataBus.getSlot(slotIndex);
//此方法其实只会初始化一次Executor不会每次都会初始化。Executor是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(condition.getThreadExecutorClass());
//获得liteflow的参数
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
//定义是否中断参数
//这里为什么要定义成数组呢因为后面lambda要用到根据final不能修改引用的原则这里用了数组对象
final boolean[] interrupted = {false};
//这里主要是做了封装CompletableFuture对象用lumbda表达式做了很多事情这句代码要仔细理清
//1.根据condition.getNodeList()的集合进行流处理用map进行把executable对象转换成List<CompletableFuture<WhenFutureObj>>
//2.在转的过程中套入CompletableFutureTimeout方法进行超时判断如果超时则用WhenFutureObj.timeOut返回超时的对象
//3.第2个参数是主要的本体CompletableFuture传入了ParallelSupplier和线程池对象
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().filter(executable -> {
try {
return executable.isAccess(slotIndex);
}catch (Exception e){
LOG.error("there was an error when executing the when component isAccess",e);
return false;
}
}).map(executable -> CompletableFutureTimeout.completeOnTimeout(
WhenFutureObj.timeOut(executable.getExecuteName()),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex), parallelExecutor),
liteflowConfig.getWhenMaxWaitSeconds(),
TimeUnit.SECONDS
)).collect(Collectors.toList());
CompletableFuture<?> resultCompletableFuture;
//这里判断执行方式
//如果any为false说明这些异步任务全部执行好或者超时才返回
//如果any为true说明这些异步任务只要任意一个执行完成就返回
if(condition.isAny()){
//把这些CompletableFuture通过anyOf合成一个CompletableFuture
resultCompletableFuture = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[]{}));
}else{
//把这些CompletableFuture通过allOf合成一个CompletableFuture
resultCompletableFuture = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[]{}));
}
try {
//进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
resultCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("there was an error when executing the CompletableFuture",e);
interrupted[0] = true;
}
//拿到已经完成的CompletableFuture
//如果any为false那么所有任务都已经完成
//如果any为true那么这里拿到的是第一个完成的任务
//这里过滤和转换一起用lumbda做了
List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> {
//过滤出已经完成的,没完成的就直接终止
if (f.isDone()){
return true;
}else{
f.cancel(true);
return false;
}
}).map(f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
interrupted[0] = true;
return null;
}
}).collect(Collectors.toList());
//判断超时上面已经拿到了所有已经完成的CompletableFuture
//那我们只要过滤出超时的CompletableFuture
List<WhenFutureObj> timeOutWhenFutureObjList = allCompletableWhenFutureObjList.stream().filter(WhenFutureObj::isTimeout).collect(Collectors.toList());
//输出超时信息
timeOutWhenFutureObjList.forEach(whenFutureObj ->
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", slot.getRequestId(), whenFutureObj.getExecutorName()));
//当配置了errorResume = false出现interrupted或者!f.get()的情况将抛出WhenExecuteException
if (!condition.isErrorResume()) {
if (interrupted[0]) {
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
}
//循环判断CompletableFuture的返回值如果异步执行失败则抛出相应的业务异常
for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){
if (!whenFutureObj.isSuccess()){
LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), slot.getRequestId()));
throw whenFutureObj.getEx();
}
}
} else if (interrupted[0]) {
// 这里由于配置了errorResume所以只打印warn日志
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", slot.getRequestId());
}
}
}

View File

@@ -15,6 +15,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.Queue;
import java.util.Stack;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,6 +64,10 @@ public class Slot<O>{
this.contextBean = contextBean;
}
private boolean hasMetaData(String key){
return metaDataMap.containsKey(key);
}
private <T> void putMetaDataMap(String key, T t) {
if (ObjectUtil.isNull(t)) {
//data slot is a ConcurrentHashMap, so null value will trigger NullPointerException
@@ -152,12 +157,33 @@ public class Slot<O>{
return (T) metaDataMap.get(COND_NODE_PREFIX + key);
}
public void setChainName(String chainName) {
putMetaDataMap(CHAIN_NAME, chainName);
public void pushChainName(String chainName) {
if (this.hasMetaData(CHAIN_NAME)){
Stack<String> stack = (Stack<String>)metaDataMap.get(CHAIN_NAME);
stack.push(chainName);
}else{
Stack<String> stack = new Stack<>();
stack.push(chainName);
this.putMetaDataMap(CHAIN_NAME, stack);
}
}
public void popChainName(){
if (this.hasMetaData(CHAIN_NAME)){
Stack<String> stack = (Stack<String>)metaDataMap.get(CHAIN_NAME);
if (stack.size() > 1){
stack.pop();
}
}
}
public String getChainName() {
return (String) metaDataMap.get(CHAIN_NAME);
try{
Stack<String> stack = (Stack<String>)metaDataMap.get(CHAIN_NAME);
return stack.peek();
}catch (Exception e){
return null;
}
}
public void addStep(CmpStep step){

View File

@@ -19,7 +19,7 @@ public class LOGOPrinter {
str.append(" | | | | | | | _| _____| |_ | | | | | \\ \\ /\\ / / \n");
str.append(" | |___ | | | | | |__|_____| _| | |__| |_| |\\ V V / \n");
str.append(" |_____|___| |_| |_____| |_| |_____\\___/ \\_/\\_/ \n\n");
str.append(" Version: v2.7.1\n");
str.append(" Version: v2.7.2\n");
str.append(" 轻量且强大的规则引擎框架。\n");
str.append(" Small but powerful rules engine.\n");
str.append("================================================================================================\n");

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -10,7 +10,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<dependencies>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -45,5 +45,7 @@ public class ImplicitSubFlowSpringbootTest extends BaseTest {
Assert.assertEquals(1, RUN_TIME_SLOT.size());
// set中第一次设置的requestId和response中的requestId一致
Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId()));
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", response.getContextBean().getData("innerRequest"));
}
}

View File

@@ -4,6 +4,7 @@ import com.yomahub.liteflow.annotation.LiteflowCmpDefine;
import com.yomahub.liteflow.annotation.LiteflowMethod;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.enums.LiteFlowMethodEnum;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowSpringbootTest.RUN_TIME_SLOT;
@@ -14,6 +15,9 @@ import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowSpringbootTest.RU
public class HCmp{
@LiteflowMethod(LiteFlowMethodEnum.PROCESS)
public void process(NodeComponent bindCmp) throws Exception {
String requestData = bindCmp.getSubChainReqData();
DefaultContext context = bindCmp.getContextBean();
context.setData("innerRequest", requestData);
RUN_TIME_SLOT.add(bindCmp.getSlot().getRequestId());

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -42,5 +42,7 @@ public class ImplicitSubFlowTest extends BaseTest {
Assert.assertEquals(1, RUN_TIME_SLOT.size());
// set中第一次设置的requestId和response中的requestId一致
Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId()));
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", response.getContextBean().getData("innerRequest"));
}
}

View File

@@ -1,12 +1,17 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowTest.RUN_TIME_SLOT;
public class HCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getContextBean();
context.setData("innerRequest", requestData);
RUN_TIME_SLOT.add(this.getSlot().getRequestId());

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -45,5 +45,7 @@ public class ImplicitSubFlowSpringbootTest extends BaseTest {
Assert.assertEquals(1, RUN_TIME_SLOT.size());
// set中第一次设置的requestId和response中的requestId一致
Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId()));
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", response.getContextBean().getData("innerRequest"));
}
}

View File

@@ -11,9 +11,9 @@ import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowSpringbootTest.RU
public class HCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getContextBean();
String str = context.getData("innerRequestData");
System.out.println(str);
context.setData("innerRequest", requestData);
RUN_TIME_SLOT.add(this.getSlot().getRequestId());

View File

@@ -5,7 +5,7 @@
<parent>
<artifactId>liteflow</artifactId>
<groupId>com.yomahub</groupId>
<version>2.7.1</version>
<version>2.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View File

@@ -39,5 +39,7 @@ public class ImplicitSubFlowSpringTest extends BaseTest {
Assert.assertEquals(1, RUN_TIME_SLOT.size());
// set中第一次设置的requestId和response中的requestId一致
Assert.assertTrue(RUN_TIME_SLOT.contains(response.getSlot().getRequestId()));
//requestData的取值正确
Assert.assertEquals("it's implicit subflow.", response.getContextBean().getData("innerRequest"));
}
}

View File

@@ -1,6 +1,7 @@
package com.yomahub.liteflow.test.subflow.cmp2;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowSpringTest.RUN_TIME_SLOT;
@@ -10,6 +11,9 @@ import static com.yomahub.liteflow.test.subflow.ImplicitSubFlowSpringTest.RUN_TI
public class HCmp extends NodeComponent {
@Override
public void process() throws Exception {
String requestData = this.getSubChainReqData();
DefaultContext context = this.getContextBean();
context.setData("innerRequest", requestData);
RUN_TIME_SLOT.add(this.getSlot().getRequestId());

View File

@@ -5,7 +5,7 @@
<groupId>com.yomahub</groupId>
<artifactId>liteflow</artifactId>
<packaging>pom</packaging>
<version>2.7.1</version>
<version>2.7.2</version>
<name>liteflow</name>
<description>a lightweight and practical micro-process framework</description>
<url>https://github.com/bryan31/liteflow</url>