mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 04:02:09 +08:00
bug 更换 PercentageOfParallelExecutor 任务收集容器,避免高并发环境下不断扩容影响性能;复原其他测试案例
This commit is contained in:
@@ -3,11 +3,10 @@ package com.yomahub.liteflow.flow.parallel.strategy;
|
||||
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
|
||||
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
/**
|
||||
* 完成指定阈值任务
|
||||
@@ -28,22 +27,24 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
|
||||
// 计算阈值数量(向上取整)
|
||||
int thresholdCount = (int) Math.ceil(total * whenCondition.getPercentage());
|
||||
|
||||
// 已完成任务收集器(对 List 加锁保证线程安全)
|
||||
List<CompletableFuture<WhenFutureObj>> completedFutures = Collections.synchronizedList(new ArrayList<>(Math.max(thresholdCount, 1) << 1));
|
||||
// 已完成任务收集器
|
||||
ConcurrentLinkedQueue<CompletableFuture<WhenFutureObj>> completedFutures = new ConcurrentLinkedQueue<>();
|
||||
|
||||
// 阈值触发门闩
|
||||
CompletableFuture<Void> thresholdFuture = new CompletableFuture<>();
|
||||
|
||||
// 原子计数器
|
||||
AtomicInteger completedCount = new AtomicInteger(0);
|
||||
LongAdder completedCount = new LongAdder();
|
||||
|
||||
// 为每个任务添加回调
|
||||
whenAllTaskList.forEach(future ->
|
||||
future.whenComplete((result, ex) -> {
|
||||
// 安全添加已完成任务
|
||||
completedFutures.add(future);
|
||||
// 计数 +1
|
||||
completedCount.increment();
|
||||
// 检查是否达到阈值
|
||||
if (completedCount.incrementAndGet() >= thresholdCount) {
|
||||
if (completedCount.intValue() >= thresholdCount) {
|
||||
// 确保只触发一次
|
||||
if (!thresholdFuture.isDone()) {
|
||||
thresholdFuture.complete(null);
|
||||
|
||||
@@ -9,12 +9,12 @@ import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.redis.mode.RClient;
|
||||
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
|
||||
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
@@ -23,6 +23,7 @@ import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
@@ -6,11 +6,10 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.parser.redis.mode.RClient;
|
||||
import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper;
|
||||
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.redisson.api.RMapCache;
|
||||
@@ -23,6 +22,7 @@ import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@@ -11,9 +11,7 @@ import com.yomahub.liteflow.parser.redis.mode.RClient;
|
||||
import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.yomahub.liteflow.test.script.javapro.parseOneMode;
|
||||
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user