bug #IASW3I 异步循环迭代组件中迭代对象以及迭代下标存在并发问题

This commit is contained in:
everywhere.z
2024-10-09 00:29:02 +08:00
parent 4f0d983c84
commit c632c5219e
4 changed files with 44 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ import com.yomahub.liteflow.flow.parallel.LoopFutureObj;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier; import java.util.function.Supplier;
/** /**
@@ -101,6 +102,11 @@ public abstract class LoopCondition extends Condition {
} }
} }
// 这个锁用于异步循环场景
// 当异步循环时,其实等同于所有的循环的子项在一个线程池内进行提交。
// 这时候如果不加锁的话在Node对象中的迭代TL对象以及循环下标TL对象由于要进行stream的循环但是原stack对象会被其他线程修改掉从而报错
private final ReentrantLock lock = new ReentrantLock();
// 循环并行执行的Supplier封装 // 循环并行执行的Supplier封装
public class LoopParallelSupplier implements Supplier<LoopFutureObj> { public class LoopParallelSupplier implements Supplier<LoopFutureObj> {
private final Executable executableItem; private final Executable executableItem;
@@ -128,6 +134,7 @@ public abstract class LoopCondition extends Condition {
@Override @Override
public LoopFutureObj get() { public LoopFutureObj get() {
lock.lock();
try { try {
executableItem.setCurrChainId(this.currChainId); executableItem.setCurrChainId(this.currChainId);
// 设置循环index // 设置循环index
@@ -140,6 +147,8 @@ public abstract class LoopCondition extends Condition {
return LoopFutureObj.success(executableItem.getId()); return LoopFutureObj.success(executableItem.getId());
} catch (Exception e) { } catch (Exception e) {
return LoopFutureObj.fail(executableItem.getId(), e); return LoopFutureObj.fail(executableItem.getId(), e);
}finally {
lock.unlock();
} }
} }
} }

View File

@@ -64,8 +64,17 @@ public class IteratorELSpringbootTest extends BaseTest {
DefaultContext context = response.getFirstContextBean(); DefaultContext context = response.getFirstContextBean();
String indexStr = context.getData("index_str"); String indexStr = context.getData("index_str");
String objStr = context.getData("obj_str"); String objStr = context.getData("obj_str");
System.out.println(indexStr); Assertions.assertEquals("[00][01][10][11][20][21]", indexStr);
System.out.println(objStr); Assertions.assertEquals("[a11][a22][b11][b22][c11][c22]", objStr);
Assertions.assertTrue(response.isSuccess()); Assertions.assertTrue(response.isSuccess());
} }
//测试多层迭代异步循环的正确性
@Test
public void testIt5() throws Exception {
for (int i = 0; i < 100; i++) {
LiteflowResponse response = flowExecutor.execute2Resp("chain5");
Assertions.assertTrue(response.isSuccess());
}
}
} }

View File

@@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.iterator.cmp;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("f")
public class FCmp extends NodeComponent {
@Override
public void process() throws Exception {
Object obj1 = this.getPreLoopObj();
Object obj2 = this.getCurrLoopObj();
System.out.println(StrUtil.format("{}{}", obj1, obj2));
}
}

View File

@@ -24,4 +24,10 @@
ITERATOR(x2).DO(e) ITERATOR(x2).DO(e)
); );
</chain> </chain>
<chain name="chain5">
ITERATOR(x1).parallel(true).DO(
ITERATOR(x2).parallel(true).DO(f)
);
</chain>
</flow> </flow>