diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java index d0d4586ca..ad0cb1730 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Node.java @@ -27,6 +27,7 @@ import com.yomahub.liteflow.slot.Slot; import com.yomahub.liteflow.util.TupleOf2; import java.util.Stack; +import java.util.concurrent.locks.ReentrantLock; /** * Node节点,实现可执行器 Node节点并不是单例的,每构建一次都会copy出一个新的实例 @@ -294,22 +295,31 @@ public class Node implements Executable, Cloneable, Rollbackable{ this.isContinueOnErrorResult.remove(); } + // 这个锁用于异步循环场景 + private ReentrantLock lock4LoopIndex = new ReentrantLock(); + public void setLoopIndex(LoopCondition condition, int index) { - if (this.loopIndexTL.get() == null){ - Stack> stack = new Stack<>(); - TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), index); - stack.push(tuple); - this.loopIndexTL.set(stack); - }else{ - Stack> stack = this.loopIndexTL.get(); - TupleOf2 thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null); - if (thisConditionTuple != null){ - thisConditionTuple.setB(index); - }else{ + try{ + lock4LoopIndex.lock(); + if (this.loopIndexTL.get() == null){ + Stack> stack = new Stack<>(); TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), index); stack.push(tuple); + this.loopIndexTL.set(stack); + }else{ + Stack> stack = this.loopIndexTL.get(); + TupleOf2 thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null); + if (thisConditionTuple != null){ + thisConditionTuple.setB(index); + }else{ + TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), index); + stack.push(tuple); + } } + }finally { + lock4LoopIndex.unlock(); } + } public Integer getLoopIndex() { @@ -335,31 +345,44 @@ public class Node implements Executable, Cloneable, Rollbackable{ } public void removeLoopIndex() { - Stack> stack = this.loopIndexTL.get(); - if (stack != null){ - if (stack.size() > 1){ - stack.pop(); - }else{ - this.loopIndexTL.remove(); + try{ + lock4LoopIndex.lock(); + Stack> stack = this.loopIndexTL.get(); + if (stack != null){ + if (stack.size() > 1){ + stack.pop(); + }else{ + this.loopIndexTL.remove(); + } } + }finally { + lock4LoopIndex.unlock(); } } + // 这个锁用于异步循环场景 + private ReentrantLock lock4LoopObj = new ReentrantLock(); + public void setCurrLoopObject(LoopCondition condition, Object obj) { - if (this.loopObjectTL.get() == null){ - Stack> stack = new Stack<>(); - TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), obj); - stack.push(tuple); - this.loopObjectTL.set(stack); - }else{ - Stack> stack = this.loopObjectTL.get(); - TupleOf2 thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null); - if (thisConditionTuple != null){ - thisConditionTuple.setB(obj); - }else{ + try{ + lock4LoopObj.lock(); + if (this.loopObjectTL.get() == null){ + Stack> stack = new Stack<>(); TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), obj); stack.push(tuple); + this.loopObjectTL.set(stack); + }else{ + Stack> stack = this.loopObjectTL.get(); + TupleOf2 thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null); + if (thisConditionTuple != null){ + thisConditionTuple.setB(obj); + }else{ + TupleOf2 tuple = new TupleOf2<>(condition.hashCode(), obj); + stack.push(tuple); + } } + }finally { + lock4LoopObj.unlock(); } } @@ -386,14 +409,20 @@ public class Node implements Executable, Cloneable, Rollbackable{ } public void removeCurrLoopObject() { - Stack> stack = this.loopObjectTL.get(); - if (stack != null){ - if (stack.size() > 1){ - stack.pop(); - }else{ - this.loopObjectTL.remove(); + try{ + lock4LoopObj.lock(); + Stack> stack = this.loopObjectTL.get(); + if (stack != null){ + if (stack.size() > 1){ + stack.pop(); + }else{ + this.loopObjectTL.remove(); + } } + }finally { + lock4LoopObj.unlock(); } + } public Integer getSlotIndex(){ @@ -442,6 +471,8 @@ public class Node implements Executable, Cloneable, Rollbackable{ node.slotIndexTL = new TransmittableThreadLocal<>(); node.isEndTL = new TransmittableThreadLocal<>(); node.isContinueOnErrorResult = new TransmittableThreadLocal<>(); + node.lock4LoopIndex = new ReentrantLock(); + node.lock4LoopObj = new ReentrantLock(); return node; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java index 5242ee839..5575121ad 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java @@ -102,11 +102,6 @@ public abstract class LoopCondition extends Condition { } } - // 这个锁用于异步循环场景 - // 当异步循环时,其实等同于所有的循环的子项在一个线程池内进行提交。 - // 这时候如果不加锁的话,在Node对象中的迭代TL对象以及循环下标TL对象,由于要进行stream的循环,但是原stack对象会被其他线程修改掉,从而报错 - private final ReentrantLock lock = new ReentrantLock(); - // 循环并行执行的Supplier封装 public class LoopParallelSupplier implements Supplier { private final Executable executableItem; @@ -134,7 +129,6 @@ public abstract class LoopCondition extends Condition { @Override public LoopFutureObj get() { - lock.lock(); try { executableItem.setCurrChainId(this.currChainId); // 设置循环index @@ -147,8 +141,6 @@ public abstract class LoopCondition extends Condition { return LoopFutureObj.success(executableItem.getId()); } catch (Exception e) { return LoopFutureObj.fail(executableItem.getId(), e); - }finally { - lock.unlock(); } } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/iterator/cmp/FCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/iterator/cmp/FCmp.java index f7c701f3b..570973b93 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/iterator/cmp/FCmp.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/iterator/cmp/FCmp.java @@ -13,6 +13,9 @@ public class FCmp extends NodeComponent { public void process() throws Exception { Object obj1 = this.getPreLoopObj(); Object obj2 = this.getCurrLoopObj(); + if (obj1 == null || obj2 == null) { + throw new RuntimeException(""); + } System.out.println(StrUtil.format("{}{}", obj1, obj2)); } }