enhancement #I4OTK4 希望finally组件可以获取到then组件的异常对象

This commit is contained in:
bryan31
2022-01-04 20:58:34 +08:00
parent 5693ca199d
commit 5e0aef9349
6 changed files with 79 additions and 34 deletions

View File

@@ -360,7 +360,7 @@ public class FlowExecutor {
chain = FlowBus.getChain(chainId);
if (ObjectUtil.isNull(chain)) {
String errorMsg = StrUtil.format("couldn't find chain with the id[{}]", chainId);
String errorMsg = StrUtil.format("[{}]:couldn't find chain with the id[{}]", slot.getRequestId(), chainId);
throw new ChainNotFoundException(errorMsg);
}
@@ -372,6 +372,14 @@ public class FlowExecutor {
}
slot.setException(e);
} finally {
try{
if (ObjectUtil.isNotNull(chain)){
chain.executeFinally(slotIndex);
}
}catch (Exception e){
LOG.error("[{}]:an exception occurred during the finally Component execution in chain[{}]", slot.getRequestId(), chain.getChainName());
}
if (!isInnerChain) {
slot.printStep();
DataBus.releaseSlot(slotIndex);

View File

@@ -70,33 +70,30 @@ public class Chain implements Executable {
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
}
Slot slot = DataBus.getSlot(slotIndex);
//循环chain里包含的condition每一个condition分四种类型pre,then,when,finally
//这里conditionList其实已经是有序的pre一定在最前面finally一定在最后面
for (Condition condition : conditionList) {
if (condition instanceof PreCondition){
for (Executable executableItem : condition.getNodeList()) {
executableItem.execute(slotIndex);
}
} else if (condition instanceof ThenCondition) {
for (Executable executableItem : condition.getNodeList()) {
executableItem.execute(slotIndex);
}
} else if (condition instanceof WhenCondition) {
executeAsyncCondition((WhenCondition) condition, slotIndex);
}
}
}
public void executeFinally(Integer slotIndex) throws Exception {
//先把finally的节点过滤出来
List<Condition> finallyConditionList = conditionList.stream().filter(condition ->
condition.getConditionType().equals(ConditionTypeEnum.TYPE_FINALLY.getType())).collect(Collectors.toList());
//循环chain里包含的condition每一个condition分四种类型pre,then,when,finally
//这里conditionList其实已经是有序的pre一定在最前面finally一定在最后面
try{
for (Condition condition : conditionList) {
if (condition instanceof PreCondition){
for (Executable executableItem : condition.getNodeList()) {
executableItem.execute(slotIndex);
}
} else if (condition instanceof ThenCondition) {
for (Executable executableItem : condition.getNodeList()) {
executableItem.execute(slotIndex);
}
} else if (condition instanceof WhenCondition) {
executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId());
}
}
}finally {
for (Condition finallyCondition : finallyConditionList){
for(Executable executableItem : finallyCondition.getNodeList()){
executableItem.execute(slotIndex);
}
for (Condition finallyCondition : finallyConditionList){
for(Executable executableItem : finallyCondition.getNodeList()){
executableItem.execute(slotIndex);
}
}
}
@@ -113,7 +110,9 @@ public class Chain implements Executable {
//使用线程池执行when并发流程
//这块涉及到挺多的多线程逻辑,所以注释比较详细,看到这里的童鞋可以仔细阅读
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) throws Exception{
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex) throws Exception{
Slot slot = DataBus.getSlot(slotIndex);
//此方法其实只会初始化一次Executor不会每次都会初始化。Executor是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildExecutor();
@@ -131,7 +130,7 @@ public class Chain implements Executable {
List<CompletableFuture<WhenFutureObj>> completableFutureList = condition.getNodeList().stream().map(
executable -> CompletableFutureTimeout.completeOnTimeout(
WhenFutureObj.timeOut(executable.getExecuteName()),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex, requestId), parallelExecutor),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, slotIndex), parallelExecutor),
liteflowConfig.getWhenMaxWaitSeconds(),
TimeUnit.SECONDS
)
@@ -186,24 +185,24 @@ public class Chain implements Executable {
//输出超时信息
timeOutWhenFutureObjList.forEach(whenFutureObj ->
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", requestId, whenFutureObj.getExecutorName()));
LOG.warn("requestId [{}] executing thread has reached max-wait-seconds, thread canceled.Execute-item: [{}]", slot.getRequestId(), whenFutureObj.getExecutorName()));
//当配置了errorResume = false出现interrupted或者!f.get()的情况将抛出WhenExecuteException
if (!condition.isErrorResume()) {
if (interrupted[0]) {
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", requestId));
throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId()));
}
//循环判断CompletableFuture的返回值如果异步执行失败则抛出相应的业务异常
for(WhenFutureObj whenFutureObj : allCompletableWhenFutureObjList){
if (!whenFutureObj.isSuccess()){
LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), requestId));
LOG.info(StrUtil.format("requestId [{}] when-executor[{}] execute failed. errorResume [false].", whenFutureObj.getExecutorName(), slot.getRequestId()));
throw whenFutureObj.getEx();
}
}
} else if (interrupted[0]) {
// 这里由于配置了errorResume所以只打印warn日志
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId);
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", slot.getRequestId());
}
}
}

View File

@@ -18,12 +18,9 @@ public class ParallelSupplier implements Supplier<WhenFutureObj> {
private final Integer slotIndex;
private final String requestId;
public ParallelSupplier(Executable executableItem, Integer slotIndex, String requestId) {
public ParallelSupplier(Executable executableItem, Integer slotIndex) {
this.executableItem = executableItem;
this.slotIndex = slotIndex;
this.requestId = requestId;
}
@Override

View File

@@ -53,4 +53,12 @@ public class PreAndFinallySpringbootTest extends BaseTest {
Assert.assertFalse(response.isSuccess());
Assert.assertEquals("p1==>p2==>a==>d==>f1==>f2", response.getSlot().printStep());
}
//测试在finally节点里是否能获取exception
@Test
public void testPreAndFinally4() throws Exception{
LiteflowResponse<DefaultSlot> response = flowExecutor.execute2Resp("chain4", "arg");
Assert.assertFalse(response.isSuccess());
Assert.assertTrue(response.getSlot().getData("hasEx"));
}
}

View File

@@ -0,0 +1,28 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.preAndFinally.cmp;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.entity.data.Slot;
import org.springframework.stereotype.Component;
@Component("f3")
public class Finally3Cmp extends NodeComponent {
@Override
public void process() throws Exception{
Slot slot = this.getSlot();
if (ObjectUtil.isNull(slot.getException())){
slot.setData("hasEx", false);
}else{
slot.setData("hasEx", true);
}
System.out.println("Finally3Cmp executed!");
}
}

View File

@@ -18,4 +18,9 @@
<then value="a,d,c"/>
<finally value="f1,f2"/>
</chain>
<chain name="chain4">
<then value="a,d,c"/>
<finally value="f3"/>
</chain>
</flow>