mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-12 12:41:04 +08:00
!263 修复 WhenCondition 下的 node 重复执行 isAccess 方法问题
Merge pull request !263 from luoyi/issues/I8MXHX
This commit is contained in:
@@ -21,8 +21,6 @@ import com.yomahub.liteflow.flow.executor.NodeExecutor;
|
||||
import com.yomahub.liteflow.flow.executor.NodeExecutorHelper;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.slot.DataBus;
|
||||
import com.yomahub.liteflow.slot.Slot;
|
||||
|
||||
@@ -30,6 +28,7 @@ import com.yomahub.liteflow.slot.Slot;
|
||||
* Node节点,实现可执行器 Node节点并不是单例的,每构建一次都会copy出一个新的实例
|
||||
*
|
||||
* @author Bryan.Zhang
|
||||
* @author luo yi
|
||||
*/
|
||||
public class Node implements Executable, Cloneable, Rollbackable{
|
||||
|
||||
@@ -57,6 +56,9 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
|
||||
private String currChainId;
|
||||
|
||||
// node 的 isAccess 结果,主要用于 WhenCondition 的提前 isAccess 判断,避免 isAccess 方法重复执行
|
||||
private TransmittableThreadLocal<Boolean> accessResult = new TransmittableThreadLocal<>();
|
||||
|
||||
private TransmittableThreadLocal<Integer> loopIndexTL = new TransmittableThreadLocal<>();
|
||||
|
||||
private TransmittableThreadLocal<Object> currLoopObject = new TransmittableThreadLocal<>();
|
||||
@@ -125,16 +127,13 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
throw new FlowSystemException("there is no instance for node id " + id);
|
||||
}
|
||||
|
||||
Slot slot = DataBus.getSlot(slotIndex);
|
||||
try {
|
||||
// 把线程属性赋值给组件对象
|
||||
instance.setSlotIndex(slotIndex);
|
||||
instance.setRefNode(this);
|
||||
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
|
||||
// 判断是否可执行,所以isAccess经常作为一个组件进入的实际判断要素,用作检查slot里的参数的完备性
|
||||
if (instance.isAccess()) {
|
||||
if (getAccessResult() || instance.isAccess()) {
|
||||
LOG.info("[O]start component[{}] execution", instance.getDisplayName());
|
||||
|
||||
// 这里开始进行重试的逻辑和主逻辑的运行
|
||||
@@ -142,8 +141,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
.buildNodeExecutor(instance.getNodeExecutorClass());
|
||||
// 调用节点执行器进行执行
|
||||
nodeExecutor.execute(instance);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
LOG.info("[X]skip component[{}] execution", instance.getDisplayName());
|
||||
}
|
||||
// 如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束
|
||||
@@ -178,6 +176,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
instance.removeIsEnd();
|
||||
instance.removeRefNode();
|
||||
removeLoopIndex();
|
||||
removeAccessResult();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -253,6 +252,19 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
return currChainId;
|
||||
}
|
||||
|
||||
public boolean getAccessResult() {
|
||||
Boolean result = this.accessResult.get();
|
||||
return result == null ? false : result;
|
||||
}
|
||||
|
||||
public void setAccessResult(boolean accessResult) {
|
||||
this.accessResult.set(accessResult);
|
||||
}
|
||||
|
||||
public void removeAccessResult() {
|
||||
this.accessResult.remove();
|
||||
}
|
||||
|
||||
public void setLoopIndex(int index) {
|
||||
this.loopIndexTL.set(index);
|
||||
}
|
||||
@@ -299,6 +311,7 @@ public class Node implements Executable, Cloneable, Rollbackable{
|
||||
Node node = (Node)this.clone();
|
||||
node.loopIndexTL = new TransmittableThreadLocal<>();
|
||||
node.currLoopObject = new TransmittableThreadLocal<>();
|
||||
node.accessResult = new TransmittableThreadLocal<>();
|
||||
return node;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,9 +31,10 @@ public class AllOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
}
|
||||
|
||||
//在allOf这个场景中,不需要过滤
|
||||
// 在 allOf 这个场景中,不需要过滤
|
||||
@Override
|
||||
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
|
||||
return stream;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* 完成任一任务
|
||||
@@ -31,18 +29,4 @@ public class AnyOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
}
|
||||
|
||||
//在anyOf这个场景中,需要过滤掉isAccess为false的场景
|
||||
//因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
|
||||
//换句话说,就是anyOf这个场景,isAccess会被执行两次
|
||||
@Override
|
||||
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
|
||||
return stream.filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
|
||||
import com.yomahub.liteflow.exception.WhenExecuteException;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.Node;
|
||||
import com.yomahub.liteflow.flow.element.condition.FinallyCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.PreCondition;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
@@ -97,8 +98,21 @@ public abstract class ParallelStrategyExecutor {
|
||||
return filterAccess(stream, slotIndex);
|
||||
}
|
||||
|
||||
//过滤isAccess的抽象接口方法
|
||||
protected abstract Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex);
|
||||
// 过滤 isAccess 的方法,默认实现,同时为避免同一个 node 的 isAccess 方法重复执行,给 node 设置 isAccess 方法执行结果
|
||||
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
|
||||
return stream.filter(executable -> {
|
||||
try {
|
||||
boolean access = executable.isAccess(slotIndex);
|
||||
if (executable instanceof Node) {
|
||||
((Node) executable).setAccessResult(access);
|
||||
}
|
||||
return access;
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取 WHEN 所需线程池
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.yomahub.liteflow.flow.element.Executable;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* 完成指定任务执行器,使用 ID 进行比较
|
||||
@@ -77,19 +75,4 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
|
||||
|
||||
}
|
||||
|
||||
//在must这个场景中,需要过滤掉isAccess为false的场景
|
||||
//因为不过滤这个的话,如果加上了 any,那么 isAccess 为 false 那就是最快的了
|
||||
//换句话说,就是must这个场景,isAccess会被执行两次
|
||||
@Override
|
||||
protected Stream<Executable> filterAccess(Stream<Executable> stream, Integer slotIndex) {
|
||||
return stream.filter(executable -> {
|
||||
try {
|
||||
return executable.isAccess(slotIndex);
|
||||
} catch (Exception e) {
|
||||
LOG.error("there was an error when executing the when component isAccess", e);
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -6,7 +6,6 @@ import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.noear.snack.ONode;
|
||||
import org.noear.solon.annotation.Import;
|
||||
import org.noear.solon.annotation.Inject;
|
||||
import org.noear.solon.test.SolonJUnit5Extension;
|
||||
|
||||
@@ -10,6 +10,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.noear.solon.annotation.Import;
|
||||
import org.noear.solon.annotation.Inject;
|
||||
import org.noear.solon.test.SolonJUnit5Extension;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/**
|
||||
|
||||
@@ -10,7 +10,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.noear.solon.annotation.Import;
|
||||
import org.noear.solon.annotation.Inject;
|
||||
import org.noear.solon.test.SolonJUnit5Extension;
|
||||
import org.noear.solon.test.annotation.TestPropertySource;
|
||||
|
||||
|
||||
@ExtendWith(SolonJUnit5Extension.class)
|
||||
|
||||
Reference in New Issue
Block a user