mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-10 03:07:32 +08:00
!355 bug 修复 PercentageOfParallelExecutor 并行任务执行时可能出现的阻塞问题
Merge pull request !355 from luoyi/issues/IIYPFL
This commit is contained in:
@@ -1,12 +1,13 @@
|
||||
package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* 完成指定阈值任务
|
||||
@@ -22,6 +23,8 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
// 获取所有 CompletableFuture 任务
|
||||
List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);
|
||||
|
||||
if (CollUtil.isEmpty(whenAllTaskList)) return;
|
||||
|
||||
int total = whenAllTaskList.size();
|
||||
|
||||
// 计算阈值数量(向上取整),为 0 时取 1,表示只等待一个完成,即 any
|
||||
@@ -34,15 +37,13 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
CompletableFuture<Void> thresholdFuture = new CompletableFuture<>();
|
||||
|
||||
// 原子计数器
|
||||
LongAdder completedCount = new LongAdder();
|
||||
AtomicInteger completedCount = new AtomicInteger();
|
||||
|
||||
// 为每个任务添加回调
|
||||
whenAllTaskList.forEach(future ->
|
||||
future.whenComplete((result, ex) -> {
|
||||
// 计数 +1
|
||||
completedCount.increment();
|
||||
|
||||
int currentCount = completedCount.intValue();
|
||||
int currentCount = completedCount.incrementAndGet();
|
||||
|
||||
if (currentCount <= thresholdCount) {
|
||||
// 添加已完成任务
|
||||
@@ -50,7 +51,7 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
}
|
||||
|
||||
// 达到阈值时触发门闩(确保只触发一次)
|
||||
if (currentCount >= thresholdCount && !thresholdFuture.isDone()) {
|
||||
if (currentCount == thresholdCount) {
|
||||
thresholdFuture.complete(null);
|
||||
}
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user