enhancement #I3DMX4 FlowExecutor记录业务异常

This commit is contained in:
zendwang
2021-03-29 12:54:41 +08:00
parent 8eac32a78f
commit b980e1ce09
6 changed files with 99 additions and 71 deletions

View File

@@ -7,20 +7,15 @@
*/
package com.yomahub.liteflow.core;
import java.text.MessageFormat;
import java.util.List;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yomahub.liteflow.entity.flow.Chain;
import com.yomahub.liteflow.entity.data.DataBus;
import com.yomahub.liteflow.entity.data.DefaultSlot;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import com.yomahub.liteflow.entity.data.Slot;
import com.yomahub.liteflow.entity.flow.Chain;
import com.yomahub.liteflow.exception.ChainNotFoundException;
import com.yomahub.liteflow.exception.ConfigErrorException;
import com.yomahub.liteflow.exception.FlowExecutorNotInitException;
@@ -30,8 +25,11 @@ import com.yomahub.liteflow.parser.LocalXmlFlowParser;
import com.yomahub.liteflow.parser.XmlFlowParser;
import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser;
import com.yomahub.liteflow.property.LiteflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.text.MessageFormat;
import java.util.List;
/**
* 流程规则主要执行器类
@@ -98,23 +96,24 @@ public class FlowExecutor {
public void reloadRule(){
init();
}
public <T extends Slot> T execute(String chainId,Object param) throws Exception{
return execute(chainId, param, DefaultSlot.class,null,false);
public void invoke(String chainId, Object param, Class<? extends Slot> slotClazz, Integer slotIndex) throws Exception {
execute(chainId, param, slotClazz, slotIndex,true);
}
public LiteflowResponse execute(String chainId, Object param) throws Exception {
return execute(chainId, param, DefaultSlot.class, null, false);
}
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz) throws Exception{
public LiteflowResponse execute(String chainId, Object param, Class<? extends Slot> slotClazz) throws Exception {
return execute(chainId, param, slotClazz,null,false);
}
public void invoke(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex) throws Exception{
execute(chainId, param, slotClazz,slotIndex,true);
}
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{
public LiteflowResponse execute(String chainId, Object param, Class<? extends Slot> slotClazz, Integer slotIndex,
boolean isInnerChain) throws Exception {
Slot slot = null;
if(FlowBus.needInit()) {
if (FlowBus.needInit()) {
init();
}
@@ -150,21 +149,22 @@ public class FlowExecutor {
} else {
slot.setChainReqData(chainId, param);
}
LiteflowResponse<Slot> response = new LiteflowResponse<>(slot);
try {
// 执行chain
chain.execute(slotIndex);
} catch (Exception e) {
response.setSuccess(false);
response.setMessage(e.getMessage());
response.setCause(e.getCause());
LOG.error("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex);
slot.setSuccess(false);
slot.setErrorMsg(e.getMessage());
} finally {
if (!isInnerChain) {
slot.printStep();
DataBus.releaseSlot(slotIndex);
}
}
return (T)slot;
return response;
}
public String getZkNode() {

View File

@@ -7,42 +7,38 @@
*/
package com.yomahub.liteflow.entity.data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Slot的抽象类实现
* @author Bryan.Zhang
*/
@SuppressWarnings("unchecked")
public abstract class AbsSlot implements Slot{
public abstract class AbsSlot implements Slot {
private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
private final String REQUEST = "request";
private static final String REQUEST = "request";
private final String RESPONSE = "response";
private static final String RESPONSE = "response";
private final String CHAINNAME = "chain_name";
private static final String CHAINNAME = "chain_name";
private final String COND_NODE_PREFIX = "cond_";
private static final String COND_NODE_PREFIX = "cond_";
private final String NODE_INPUT_PREFIX = "input_";
private static final String NODE_INPUT_PREFIX = "input_";
private final String NODE_OUTPUT_PREFIX = "output_";
private static final String NODE_OUTPUT_PREFIX = "output_";
private final String CHAIN_REQ_PREFIX = "chain_req_";
private static final String CHAIN_REQ_PREFIX = "chain_req_";
private final String REQUEST_ID = "req_id";
private boolean isSuccess = true;
private String errorMsg;
private static final String REQUEST_ID = "req_id";
private Deque<CmpStep> executeSteps = new ArrayDeque<CmpStep>();
@@ -142,20 +138,4 @@ public abstract class AbsSlot implements Slot{
public Deque<CmpStep> getExecuteSteps() {
return executeSteps;
}
public boolean isSuccess() {
return isSuccess;
}
public void setSuccess(boolean success) {
isSuccess = success;
}
public String getErrorMsg() {
return errorMsg;
}
public void setErrorMsg(String errorMsg) {
this.errorMsg = errorMsg;
}
}

View File

@@ -0,0 +1,58 @@
package com.yomahub.liteflow.entity.data;
import java.io.Serializable;
/**
* 执行结果封装类
* @author zend.wang
*/
public class LiteflowResponse<T> implements Serializable {
private static final long serialVersionUID = -2792556188993845048L;
private boolean success;
private String message;
private Throwable cause;
private T data;
public LiteflowResponse(T data) {
this.success = true;
this.message = "";
this.data = data;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(final boolean success) {
this.success = success;
}
public String getMessage() {
return message;
}
public void setMessage(final String message) {
this.message = message;
}
public Throwable getCause() {
return cause;
}
public void setCause(final Throwable cause) {
this.cause = cause;
}
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}

View File

@@ -51,12 +51,4 @@ public interface Slot {
public void setChainName(String chainName);
public String getChainName();
public boolean isSuccess();
public void setSuccess(boolean success);
public String getErrorMsg();
public void setErrorMsg(String errorMsg);
}

View File

@@ -1,7 +1,7 @@
package com.yomahub.flowtest;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.Slot;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -16,8 +16,8 @@ public class Runner {
context.start();
log.info("启动成功");
FlowExecutor flowExecutor = context.getBean(FlowExecutor.class);
Slot slot = flowExecutor.execute("chain3", "it's a request");
System.out.println(slot);
LiteflowResponse response = flowExecutor.execute("chain3", "it's a request");
System.out.println(response);
while (true){
Thread.sleep(60000);

View File

@@ -1,9 +1,7 @@
package com.yomahub.flowtest;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.entity.data.Slot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.yomahub.liteflow.entity.data.LiteflowResponse;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@@ -17,7 +15,7 @@ public class TestFlow implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
Slot slot = flowExecutor.execute("chain2", "it's a request");
System.out.println(slot);
LiteflowResponse response= flowExecutor.execute("chain2", "it's a request");
System.out.println(response);
}
}