enhancement #I66KP6 从底层改变NodeComponent和Node之间的实例关联问题

This commit is contained in:
everywhere.z
2022-12-17 17:43:02 +08:00
parent 1a1cb5ee56
commit ea5c828078
48 changed files with 102 additions and 142 deletions

View File

@@ -133,7 +133,6 @@ public class OperatorHelper {
public static <T> T convert(Object object, Class<T> clazz, String errorMsg) throws QLException {
try {
if (clazz.isAssignableFrom(object.getClass())) {
if (clazz.equals(Node.class)) {
Node node = (Node) object;
return (T) node.copy();

View File

@@ -12,6 +12,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.executor.DefaultNodeExecutor;
import com.yomahub.liteflow.enums.NodeTypeEnum;
@@ -58,7 +59,10 @@ public abstract class NodeComponent{
/** 节点执行器的类全名 */
private Class<? extends NodeExecutor> nodeExecutorClass = DefaultNodeExecutor.class;
/********************以下的属性为线程附加属性,并非不变属性********************/
/**当前对象为单例注册进spring上下文但是node实例不是单例这里通过对node实例的引用来获得一些链路属性**/
private final TransmittableThreadLocal<Node> refNodeTL = new TransmittableThreadLocal<>();
/********************以下的属性为线程附加属性********************/
//当前slot的index
private final TransmittableThreadLocal<Integer> slotIndexTL = new TransmittableThreadLocal<>();
@@ -66,15 +70,6 @@ public abstract class NodeComponent{
//是否结束整个流程,这个只对串行流程有效,并行流程无效
private final TransmittableThreadLocal<Boolean> isEndTL = new TransmittableThreadLocal<>();
//tag标签
private final TransmittableThreadLocal<String> tagTL = new TransmittableThreadLocal<>();
//当前流程名字
private final TransmittableThreadLocal<String> currChainNameTL = new TransmittableThreadLocal<>();
//组件外部参数
private final TransmittableThreadLocal<String> cmpDataTL = new TransmittableThreadLocal<>();
public NodeComponent() {
}
@@ -83,7 +78,7 @@ public abstract class NodeComponent{
//在元数据里加入step信息
CmpStep cmpStep = new CmpStep(nodeId, name, CmpStepTypeEnum.SINGLE);
cmpStep.setTag(tagTL.get());
cmpStep.setTag(this.getTag());
slot.addStep(cmpStep);
StopWatch stopWatch = new StopWatch();
@@ -273,16 +268,8 @@ public abstract class NodeComponent{
this.nodeExecutorClass = nodeExecutorClass;
}
public void setTag(String tag){
this.tagTL.set(tag);
}
public String getTag(){
return this.tagTL.get();
}
public void removeTag(){
this.tagTL.remove();
return this.refNodeTL.get().getTag();
}
public MonitorBus getMonitorBus() {
@@ -298,11 +285,11 @@ public abstract class NodeComponent{
}
public <T> T getSubChainReqData(){
return getSlot().getChainReqData(this.getCurrChainName());
return getSlot().getChainReqData(this.getCurrChainId());
}
public <T> T getSubChainReqDataInAsync(){
return getSlot().getChainReqDataFromQueue(this.getCurrChainName());
return getSlot().getChainReqDataFromQueue(this.getCurrChainId());
}
/**
@@ -326,61 +313,31 @@ public abstract class NodeComponent{
}
}
/**
*
* @param currChainName 当前chain名称
* @deprecated 请使用 {@link #setCurrChainId(String)}
*/
@Deprecated
public void setCurrChainName(String currChainName){
this.currChainNameTL.set(currChainName);
}
/**
* @deprecated 请使用 {@link #getCurrChainId()}
* @return String
*/
@Deprecated
public String getCurrChainName(){
return this.currChainNameTL.get();
}
/**
* @deprecated 请使用 {@link #removeCurrChainId()}
*/
@Deprecated
public void removeCurrChainName(){
this.currChainNameTL.remove();
}
public void setCurrChainId(String currChainName){
this.currChainNameTL.set(currChainName);
}
public String getCurrChainId(){
return this.currChainNameTL.get();
return getRefNode().getCurrChainId();
}
public void removeCurrChainId(){
this.currChainNameTL.remove();
public Node getRefNode(){
return this.refNodeTL.get();
}
public void setCmpData(String cmpData){
this.cmpDataTL.set(cmpData);
public void setRefNode(Node refNode){
this.refNodeTL.set(refNode);
}
public void removeRefNode(){
this.refNodeTL.remove();
}
public <T> T getCmpData(Class<T> clazz){
if (StrUtil.isBlank(this.cmpDataTL.get())){
String cmpData = getRefNode().getCmpData();
if (StrUtil.isBlank(cmpData)){
return null;
}
if (clazz.equals(String.class) || clazz.equals(Object.class)){
return (T) this.cmpDataTL.get();
return (T) cmpData;
}
return JsonUtil.parseObject(this.cmpDataTL.get(), clazz);
}
public void removeCmpData(){
this.cmpDataTL.remove();
return JsonUtil.parseObject(cmpData, clazz);
}
@Deprecated

View File

@@ -50,6 +50,8 @@ public class Node implements Executable,Cloneable{
private String cmpData;
private String currChainId;
public Node(){
}
@@ -106,8 +108,7 @@ public class Node implements Executable,Cloneable{
try {
//把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
instance.setTag(tag);
instance.setCmpData(cmpData);
instance.setRefNode(this);
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
@@ -148,9 +149,7 @@ public class Node implements Executable,Cloneable{
//移除threadLocal里的信息
instance.removeSlotIndex();
instance.removeIsEnd();
instance.removeTag();
instance.removeCurrChainId();
instance.removeCmpData();
instance.removeRefNode();
}
}
@@ -162,8 +161,7 @@ public class Node implements Executable,Cloneable{
public boolean isAccess(Integer slotIndex) throws Exception {
//把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex);
instance.setTag(tag);
instance.setCmpData(cmpData);
instance.setRefNode(this);
return instance.isAccess();
}
@@ -210,11 +208,6 @@ public class Node implements Executable,Cloneable{
this.clazz = clazz;
}
@Override
public void setCurrChainId(String currentChainId) {
instance.setCurrChainId(currentChainId);
}
public String getCmpData() {
return cmpData;
}
@@ -222,4 +215,13 @@ public class Node implements Executable,Cloneable{
public void setCmpData(String cmpData) {
this.cmpData = cmpData;
}
@Override
public void setCurrChainId(String currentChainId) {
this.currChainId = currentChainId;
}
public String getCurrChainId() {
return currChainId;
}
}

View File

@@ -32,7 +32,7 @@ public abstract class Condition implements Executable{
* 当前所在的ChainName
* 如果对于子流程来说那这个就是子流程所在的Chain
*/
private String currChainName;
private String currChainId;
@Override
public ExecuteTypeEnum getExecuteType() {
@@ -73,15 +73,15 @@ public abstract class Condition implements Executable{
*/
@Deprecated
public String getCurrChainName() {
return currChainName;
return currChainId;
}
public String getCurrChainId() {
return currChainName;
return currChainId;
}
@Override
public void setCurrChainId(String currChainName) {
this.currChainName = currChainName;
public void setCurrChainId(String currChainId) {
this.currChainId = currChainId;
}
}

View File

@@ -45,7 +45,7 @@ public class ForCondition extends LoopCondition{
executableItem.execute(slotIndex);
//如果break组件不为空则去执行
if (ObjectUtil.isNotNull(breakNode)){
breakNode.setCurrChainName(this.getCurrChainName());
breakNode.setCurrChainId(this.getCurrChainId());
breakNode.execute(slotIndex);
Class<?> originalBreakClass = LiteFlowProxyUtil.getUserClass(this.breakNode.getInstance().getClass());
boolean isBreak = slot.getBreakResult(originalBreakClass.getName());

View File

@@ -41,6 +41,7 @@ public class ThenCondition extends Condition {
public void execute(Integer slotIndex) throws Exception {
try{
for (PreCondition preCondition : preConditionList){
preCondition.setCurrChainId(this.getCurrChainId());
preCondition.execute(slotIndex);
}
@@ -64,6 +65,7 @@ public class ThenCondition extends Condition {
throw e;
}finally {
for (FinallyCondition finallyCondition : finallyConditionList){
finallyCondition.setCurrChainId(this.getCurrChainId());
finallyCondition.execute(slotIndex);
}
}

View File

@@ -29,11 +29,11 @@ public class ParallelSupplier implements Supplier<WhenFutureObj> {
@Override
public WhenFutureObj get() {
try {
executableItem.setCurrChainName(currChainName);
executableItem.setCurrChainId(currChainName);
executableItem.execute(slotIndex);
return WhenFutureObj.success(executableItem.getExecuteName());
return WhenFutureObj.success(executableItem.getExecuteId());
} catch (Exception e){
return WhenFutureObj.fail(executableItem.getExecuteName(), e);
return WhenFutureObj.fail(executableItem.getExecuteId(), e);
}
}
}