bug #I8MXHX 修复 WhenCondition 下的 node 重复执行 isAccess 方法问题

This commit is contained in:
luoyi
2024-01-15 12:26:33 +08:00
parent 9b72d89164
commit 5c6b8387dd
5 changed files with 22 additions and 54 deletions

View File

@@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor;
import com.yomahub.liteflow.flow.executor.NodeExecutorHelper; import com.yomahub.liteflow.flow.executor.NodeExecutorHelper;
import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.slot.DataBus; import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.slot.Slot;
@@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot;
* Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例 * Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例
* *
* @author Bryan.Zhang * @author Bryan.Zhang
* @author luo yi
*/ */
public class Node implements Executable, Cloneable, Rollbackable{ public class Node implements Executable, Cloneable, Rollbackable{
@@ -57,6 +56,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
private String currChainId; private String currChainId;
// node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行
private boolean accessResult;
private TransmittableThreadLocal<Integer> loopIndexTL = new TransmittableThreadLocal<>(); private TransmittableThreadLocal<Integer> loopIndexTL = new TransmittableThreadLocal<>();
private TransmittableThreadLocal<Object> currLoopObject = new TransmittableThreadLocal<>(); private TransmittableThreadLocal<Object> currLoopObject = new TransmittableThreadLocal<>();
@@ -125,16 +127,13 @@ public class Node implements Executable, Cloneable, Rollbackable{
throw new FlowSystemException("there is no instance for node id " + id); throw new FlowSystemException("there is no instance for node id " + id);
} }
Slot slot = DataBus.getSlot(slotIndex);
try { try {
// 把线程属性赋值给组件对象 // 把线程属性赋值给组件对象
instance.setSlotIndex(slotIndex); instance.setSlotIndex(slotIndex);
instance.setRefNode(this); instance.setRefNode(this);
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 判断是否可执行所以isAccess经常作为一个组件进入的实际判断要素用作检查slot里的参数的完备性 // 判断是否可执行所以isAccess经常作为一个组件进入的实际判断要素用作检查slot里的参数的完备性
if (instance.isAccess()) { if (accessResult || instance.isAccess()) {
LOG.info("[O]start component[{}] execution", instance.getDisplayName()); LOG.info("[O]start component[{}] execution", instance.getDisplayName());
// 这里开始进行重试的逻辑和主逻辑的运行 // 这里开始进行重试的逻辑和主逻辑的运行
@@ -142,8 +141,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
.buildNodeExecutor(instance.getNodeExecutorClass()); .buildNodeExecutor(instance.getNodeExecutorClass());
// 调用节点执行器进行执行 // 调用节点执行器进行执行
nodeExecutor.execute(instance); nodeExecutor.execute(instance);
} } else {
else {
LOG.info("[X]skip component[{}] execution", instance.getDisplayName()); LOG.info("[X]skip component[{}] execution", instance.getDisplayName());
} }
// 如果组件覆盖了isEnd方法或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束 // 如果组件覆盖了isEnd方法或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束
@@ -253,6 +251,14 @@ public class Node implements Executable, Cloneable, Rollbackable{
return currChainId; return currChainId;
} }
public boolean getAccessResult() {
return accessResult;
}
public void setAccessResult(boolean accessResult) {
this.accessResult = accessResult;
}
public void setLoopIndex(int index) { public void setLoopIndex(int index) {
this.loopIndexTL.set(index); this.loopIndexTL.set(index);
} }

View File

@@ -1,12 +1,10 @@
package com.yomahub.liteflow.flow.parallel.strategy; package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
/** /**
* 完成全部任务 * 完成全部任务
@@ -31,9 +29,4 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
} }
//在allOf这个场景中不需要过滤
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream;
}
} }

View File

@@ -1,12 +1,10 @@
package com.yomahub.liteflow.flow.parallel.strategy; package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
/** /**
* 完成任一任务 * 完成任一任务
@@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
} }
//在anyOf这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是anyOf这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
} }

View File

@@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.enums.ParallelStrategyEnum; import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.exception.WhenExecuteException;
import com.yomahub.liteflow.flow.element.Executable; import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.condition.FinallyCondition; import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
import com.yomahub.liteflow.flow.element.condition.PreCondition; import com.yomahub.liteflow.flow.element.condition.PreCondition;
import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition;
@@ -92,22 +93,23 @@ public abstract class ParallelStrategyExecutor {
protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) { protected Stream<Executable> filterWhenTaskList(List<Executable> executableList, Integer slotIndex) {
// 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了 // 1.先进行过滤,前置和后置组件过滤掉,因为在 EL Chain 处理的时候已经提出来了
// 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了 // 2.过滤 isAccess 为 false 的情况,因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
Stream<Executable> stream = executableList.stream() // 3.为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果
return executableList.stream()
.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition)) .filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
.filter(executable -> { .filter(executable -> {
try { try {
return executable.isAccess(slotIndex); boolean access = executable.isAccess(slotIndex);
if (executable instanceof Node) {
((Node) executable).setAccessResult(access);
}
return access;
} catch (Exception e) { } catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e); LOG.error("there was an error when executing the when component isAccess", e);
return false; return false;
} }
}); });
return filterAccess(stream, slotIndex);
} }
//过滤isAccess的抽象接口方法
protected abstract Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex);
/** /**
* 获取 WHEN 所需线程池 * 获取 WHEN 所需线程池
* @param whenCondition * @param whenCondition

View File

@@ -1,14 +1,12 @@
package com.yomahub.liteflow.flow.parallel.strategy; package com.yomahub.liteflow.flow.parallel.strategy;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.yomahub.liteflow.flow.element.Executable;
import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj; import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
/** /**
* 完成指定任务执行器,使用 ID 进行比较 * 完成指定任务执行器,使用 ID 进行比较
@@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
} }
//在must这个场景中需要过滤掉isAccess为false的场景
//因为不过滤这个的话,如果加上了 any那么 isAccess 为 false 那就是最快的了
//换句话说就是must这个场景isAccess会被执行两次
@Override
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
return stream.filter(executable -> {
try {
return executable.isAccess(slotIndex);
} catch (Exception e) {
LOG.error("there was an error when executing the when component isAccess", e);
return false;
}
});
}
} }