bug #IB0X4Q 修复2.12.4的异步循环产生的bug

This commit is contained in:
everywhere.z
2024-10-30 19:31:17 +08:00
parent 7adfe11491
commit 9a24341b09
3 changed files with 68 additions and 42 deletions

View File

@@ -27,6 +27,7 @@ import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.util.TupleOf2;
import java.util.Stack;
import java.util.concurrent.locks.ReentrantLock;
/**
* Node节点实现可执行器 Node节点并不是单例的每构建一次都会copy出一个新的实例
@@ -294,22 +295,31 @@ public class Node implements Executable, Cloneable, Rollbackable{
this.isContinueOnErrorResult.remove();
}
// 这个锁用于异步循环场景
private ReentrantLock lock4LoopIndex = new ReentrantLock();
public void setLoopIndex(LoopCondition condition, int index) {
if (this.loopIndexTL.get() == null){
Stack<TupleOf2<Integer, Integer>> stack = new Stack<>();
TupleOf2<Integer, Integer> tuple = new TupleOf2<>(condition.hashCode(), index);
stack.push(tuple);
this.loopIndexTL.set(stack);
}else{
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
TupleOf2<Integer, Integer> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(index);
}else{
try{
lock4LoopIndex.lock();
if (this.loopIndexTL.get() == null){
Stack<TupleOf2<Integer, Integer>> stack = new Stack<>();
TupleOf2<Integer, Integer> tuple = new TupleOf2<>(condition.hashCode(), index);
stack.push(tuple);
this.loopIndexTL.set(stack);
}else{
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
TupleOf2<Integer, Integer> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(index);
}else{
TupleOf2<Integer, Integer> tuple = new TupleOf2<>(condition.hashCode(), index);
stack.push(tuple);
}
}
}finally {
lock4LoopIndex.unlock();
}
}
public Integer getLoopIndex() {
@@ -335,31 +345,44 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public void removeLoopIndex() {
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();
}else{
this.loopIndexTL.remove();
try{
lock4LoopIndex.lock();
Stack<TupleOf2<Integer, Integer>> stack = this.loopIndexTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();
}else{
this.loopIndexTL.remove();
}
}
}finally {
lock4LoopIndex.unlock();
}
}
// 这个锁用于异步循环场景
private ReentrantLock lock4LoopObj = new ReentrantLock();
public void setCurrLoopObject(LoopCondition condition, Object obj) {
if (this.loopObjectTL.get() == null){
Stack<TupleOf2<Integer, Object>> stack = new Stack<>();
TupleOf2<Integer, Object> tuple = new TupleOf2<>(condition.hashCode(), obj);
stack.push(tuple);
this.loopObjectTL.set(stack);
}else{
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
TupleOf2<Integer, Object> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(obj);
}else{
try{
lock4LoopObj.lock();
if (this.loopObjectTL.get() == null){
Stack<TupleOf2<Integer, Object>> stack = new Stack<>();
TupleOf2<Integer, Object> tuple = new TupleOf2<>(condition.hashCode(), obj);
stack.push(tuple);
this.loopObjectTL.set(stack);
}else{
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
TupleOf2<Integer, Object> thisConditionTuple = stack.stream().filter(tuple -> tuple.getA().equals(condition.hashCode())).findFirst().orElse(null);
if (thisConditionTuple != null){
thisConditionTuple.setB(obj);
}else{
TupleOf2<Integer, Object> tuple = new TupleOf2<>(condition.hashCode(), obj);
stack.push(tuple);
}
}
}finally {
lock4LoopObj.unlock();
}
}
@@ -386,14 +409,20 @@ public class Node implements Executable, Cloneable, Rollbackable{
}
public void removeCurrLoopObject() {
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();
}else{
this.loopObjectTL.remove();
try{
lock4LoopObj.lock();
Stack<TupleOf2<Integer, Object>> stack = this.loopObjectTL.get();
if (stack != null){
if (stack.size() > 1){
stack.pop();
}else{
this.loopObjectTL.remove();
}
}
}finally {
lock4LoopObj.unlock();
}
}
public Integer getSlotIndex(){
@@ -442,6 +471,8 @@ public class Node implements Executable, Cloneable, Rollbackable{
node.slotIndexTL = new TransmittableThreadLocal<>();
node.isEndTL = new TransmittableThreadLocal<>();
node.isContinueOnErrorResult = new TransmittableThreadLocal<>();
node.lock4LoopIndex = new ReentrantLock();
node.lock4LoopObj = new ReentrantLock();
return node;
}
}

View File

@@ -102,11 +102,6 @@ public abstract class LoopCondition extends Condition {
}
}
// 这个锁用于异步循环场景
// 当异步循环时,其实等同于所有的循环的子项在一个线程池内进行提交。
// 这时候如果不加锁的话在Node对象中的迭代TL对象以及循环下标TL对象由于要进行stream的循环但是原stack对象会被其他线程修改掉从而报错
private final ReentrantLock lock = new ReentrantLock();
// 循环并行执行的Supplier封装
public class LoopParallelSupplier implements Supplier<LoopFutureObj> {
private final Executable executableItem;
@@ -134,7 +129,6 @@ public abstract class LoopCondition extends Condition {
@Override
public LoopFutureObj get() {
lock.lock();
try {
executableItem.setCurrChainId(this.currChainId);
// 设置循环index
@@ -147,8 +141,6 @@ public abstract class LoopCondition extends Condition {
return LoopFutureObj.success(executableItem.getId());
} catch (Exception e) {
return LoopFutureObj.fail(executableItem.getId(), e);
}finally {
lock.unlock();
}
}
}