From 504a3c6361a77a2932885c0efb286f4a3fe3ea15 Mon Sep 17 00:00:00 2001 From: luoyi <972849752@qq.com> Date: Tue, 15 Jul 2025 15:38:04 +0800 Subject: [PATCH] =?UTF-8?q?bug=20=E6=9B=B4=E6=8D=A2=20PercentageOfParallel?= =?UTF-8?q?Executor=20=E4=BB=BB=E5=8A=A1=E6=94=B6=E9=9B=86=E5=AE=B9?= =?UTF-8?q?=E5=99=A8=EF=BC=8C=E9=81=BF=E5=85=8D=E9=AB=98=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E4=B8=8B=E4=B8=8D=E6=96=AD=E6=89=A9=E5=AE=B9?= =?UTF-8?q?=E5=BD=B1=E5=93=8D=E6=80=A7=E8=83=BD=EF=BC=9B=E5=A4=8D=E5=8E=9F?= =?UTF-8?q?=E5=85=B6=E4=BB=96=E6=B5=8B=E8=AF=95=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../strategy/PercentageOfParallelExecutor.java | 15 ++++++++------- .../redis/RedisClusterPollSpringBootTest.java | 7 ++++--- .../RedisClusterSubscribeSpringBootTest.java | 6 +++--- .../redis/RedisWithXmlELPollSpringbootTest.java | 4 +--- .../ScriptJavaxProParseOneModeTest.java | 2 ++ 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java index 966beadaa..8e9460b33 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java @@ -3,11 +3,10 @@ package com.yomahub.liteflow.flow.parallel.strategy; import com.yomahub.liteflow.flow.element.condition.WhenCondition; import com.yomahub.liteflow.flow.parallel.WhenFutureObj; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.LongAdder; /** * 完成指定阈值任务 @@ -28,22 +27,24 @@ public class PercentageOfParallelExecutor extends ParallelStrategyExecutor { // 计算阈值数量(向上取整) int thresholdCount = (int) Math.ceil(total * whenCondition.getPercentage()); - // 已完成任务收集器(对 List 加锁保证线程安全) - List> completedFutures = Collections.synchronizedList(new ArrayList<>(Math.max(thresholdCount, 1) << 1)); + // 已完成任务收集器 + ConcurrentLinkedQueue> completedFutures = new ConcurrentLinkedQueue<>(); // 阈值触发门闩 CompletableFuture thresholdFuture = new CompletableFuture<>(); // 原子计数器 - AtomicInteger completedCount = new AtomicInteger(0); + LongAdder completedCount = new LongAdder(); // 为每个任务添加回调 whenAllTaskList.forEach(future -> future.whenComplete((result, ex) -> { // 安全添加已完成任务 completedFutures.add(future); + // 计数 +1 + completedCount.increment(); // 检查是否达到阈值 - if (completedCount.incrementAndGet() >= thresholdCount) { + if (completedCount.intValue() >= thresholdCount) { // 确保只触发一次 if (!thresholdFuture.isDone()) { thresholdFuture.complete(null); diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java index 8f366f29c..14f697796 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterPollSpringBootTest.java @@ -9,12 +9,12 @@ import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; + import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; + import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; @@ -23,6 +23,7 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import javax.annotation.Resource; + import java.lang.reflect.Field; import java.util.HashSet; import java.util.Set; diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java index 3f5368d4b..f90f70985 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisClusterSubscribeSpringBootTest.java @@ -6,11 +6,10 @@ import com.yomahub.liteflow.flow.LiteflowResponse; import com.yomahub.liteflow.parser.helper.NodeConvertHelper; import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.RedisParserHelper; + import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.redisson.api.RMapCache; @@ -23,6 +22,7 @@ import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension; import javax.annotation.Resource; + import java.util.HashSet; import java.util.Map; import java.util.Set; diff --git a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java index bf3d35f2f..2c0a7397a 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-redis-springboot/src/test/java/com/yomahub/liteflow/test/redis/RedisWithXmlELPollSpringbootTest.java @@ -11,9 +11,7 @@ import com.yomahub.liteflow.parser.redis.mode.RClient; import com.yomahub.liteflow.parser.redis.mode.polling.RedisParserPollingMode; import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; diff --git a/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java index 8d9d3887c..734a0ff1d 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-script-javaxpro-springboot/src/test/java/com/yomahub/liteflow/test/script/javapro/parseOneMode/ScriptJavaxProParseOneModeTest.java @@ -2,12 +2,14 @@ package com.yomahub.liteflow.test.script.javapro.parseOneMode; import com.yomahub.liteflow.core.FlowExecutor; import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.slot.DefaultContext; import com.yomahub.liteflow.test.BaseTest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringExtension;