diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java index 0a4f426d7..b777752db 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/LiteFlowChainELBuilder.java @@ -1,7 +1,10 @@ package com.yomahub.liteflow.builder.el; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.*; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.CharUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ql.util.express.DefaultContext; @@ -27,7 +30,10 @@ import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.property.LiteflowConfigGetter; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; /** @@ -36,6 +42,7 @@ import java.util.*; * @author Bryan.Zhang * @author Jay li * @author jason + * @author luo yi * @since 2.8.0 */ public class LiteFlowChainELBuilder { @@ -90,6 +97,7 @@ public class LiteFlowChainELBuilder { EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.TAG, Object.class, new TagOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ANY, Object.class, new AnyOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MUST, Object.class, new MustOperator()); + EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PERCENTAGE, Object.class, new PercentageOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ID, Object.class, new IdOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.IGNORE_ERROR, Object.class, new IgnoreErrorOperator()); EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.THREAD_POOL, Object.class, new ThreadPoolOperator()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/PercentageOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/PercentageOperator.java new file mode 100644 index 000000000..0bb172f3a --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/PercentageOperator.java @@ -0,0 +1,35 @@ +package com.yomahub.liteflow.builder.el.operator; + +import com.ql.util.express.exception.QLException; +import com.yomahub.liteflow.builder.el.operator.base.BaseOperator; +import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper; +import com.yomahub.liteflow.enums.ParallelStrategyEnum; +import com.yomahub.liteflow.flow.element.condition.WhenCondition; + +/** + * EL 规则中的 percentage 的操作符 + * + * @author luo yi + * @since 2.13.4 + */ +public class PercentageOperator extends BaseOperator { + + @Override + public WhenCondition build(Object[] objects) throws Exception { + OperatorHelper.checkObjectSizeEqTwo(objects); + + WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, "The caller must be WhenCondition item"); + + // 指定并行任务需要完成的阈值 + Double percentage = OperatorHelper.convert2Double(objects[1]); + + if (percentage > 1 || percentage < 0) { + throw new QLException("The percentage must be between 0 and 1."); + } + + whenCondition.setParallelStrategy(ParallelStrategyEnum.PERCENTAGE); + whenCondition.setPercentage(percentage); + return whenCondition; + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/base/OperatorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/base/OperatorHelper.java index 712bced8d..a407639ba 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/base/OperatorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/base/OperatorHelper.java @@ -17,6 +17,7 @@ import java.util.Objects; * Operator 常用工具类 * * @author gaibu + * @author luo yi * @since 2.8.6 */ public class OperatorHelper { @@ -109,6 +110,18 @@ public class OperatorHelper { return convert(object, clazz, errorMsg); } + public static Double convert2Double(Object object) throws QLException { + if (object instanceof Number) { + // 对 float 特别处理,避免精度问题 + if (object instanceof Float) { + // 使用字符串转换避免 float 精度损失 + return Double.parseDouble(Float.toString((Float) object)); + } + return ((Number) object).doubleValue(); + } + throw new QLException(StrUtil.format("Unsupported type: {}, it must be numeric type.", object.getClass().getName())); + } + /** * 转换 object 为指定的类型,自定义错误信息 如果是Node类型的则进行copy */ diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java b/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java index 492c7094f..2f7dd3223 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/common/ChainConstant.java @@ -45,6 +45,8 @@ public interface ChainConstant { String MUST = "must"; + String PERCENTAGE = "percentage"; + String TYPE = "type"; String THEN = "THEN"; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java index 09c9af437..835e2a5a8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/enums/ParallelStrategyEnum.java @@ -1,9 +1,6 @@ package com.yomahub.liteflow.enums; -import com.yomahub.liteflow.flow.parallel.strategy.AllOfParallelExecutor; -import com.yomahub.liteflow.flow.parallel.strategy.AnyOfParallelExecutor; -import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor; -import com.yomahub.liteflow.flow.parallel.strategy.SpecifyParallelExecutor; +import com.yomahub.liteflow.flow.parallel.strategy.*; /** * 并行策略枚举类 @@ -17,7 +14,10 @@ public enum ParallelStrategyEnum { ALL("allOf", "完成全部任务", AllOfParallelExecutor.class), - SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class); + SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class), + + PERCENTAGE("percentageOf", "完整指定阈值任务", PercentageOfParallelExecutor.class); + private String strategyType; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java index 15eb31cbd..69e544252 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhenCondition.java @@ -51,6 +51,9 @@ public class WhenCondition extends Condition { // 等待时间单位 private TimeUnit maxWaitTimeUnit; + // 并发任务指定阈值,取值 0 - 1 + private Double percentage; + @Override public void executeCondition(Integer slotIndex) throws Exception { executeAsyncCondition(slotIndex); @@ -130,4 +133,12 @@ public class WhenCondition extends Condition { public void setMaxWaitTimeUnit(TimeUnit maxWaitTimeUnit) { this.maxWaitTimeUnit = maxWaitTimeUnit; } + + public Double getPercentage() { + return percentage; + } + + public void setPercentage(Double percentage) { + this.percentage = percentage; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java new file mode 100644 index 000000000..cbed7a4c7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/PercentageOfParallelExecutor.java @@ -0,0 +1,66 @@ +package com.yomahub.liteflow.flow.parallel.strategy; + +import com.yomahub.liteflow.flow.element.condition.WhenCondition; +import com.yomahub.liteflow.flow.parallel.WhenFutureObj; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 完成指定阈值任务 + * + * @author luo yi + * @since 2.13.4 + */ +public class PercentageOfParallelExecutor extends ParallelStrategyExecutor { + + @Override + public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception { + + // 获取所有 CompletableFuture 任务 + List> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex); + + int total = whenAllTaskList.size(); + + // 计算阈值数量(向上取整) + int thresholdCount = (int) Math.ceil(total * whenCondition.getPercentage()); + + // 已完成任务收集器(对 List 加锁保证线程安全) + List> completedFutures = Collections.synchronizedList(new ArrayList<>(thresholdCount)); + + // 阈值触发门闩 + CompletableFuture thresholdFuture = new CompletableFuture<>(); + + // 原子计数器 + AtomicInteger completedCount = new AtomicInteger(0); + + // 为每个任务添加回调 + whenAllTaskList.forEach(future -> + future.whenComplete((result, ex) -> { + // 安全添加已完成任务 + completedFutures.add(future); + // 检查是否达到阈值 + if (completedCount.incrementAndGet() >= thresholdCount) { + // 确保只触发一次 + if (!thresholdFuture.isDone()) { + thresholdFuture.complete(null); + } + } + }) + ); + + // 创建组合任务(仅包含已完成任务) + CompletableFuture combinedTask = thresholdFuture.thenRun(() -> { + // 达到阈值时创建 allOf 任务 + CompletableFuture.allOf(completedFutures.toArray(new CompletableFuture[]{})).join(); + }); + + // 处理结果(会阻塞直到阈值任务完成) + this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, combinedTask); + + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java index 37e0413d2..9dfe3a0a1 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/AsyncNodeELSpringbootTest.java @@ -184,4 +184,49 @@ public class AsyncNodeELSpringbootTest extends BaseTest { Assertions.assertTrue(context.getData("check").toString().startsWith("akbgc")); } + // 测试 percentage 关键字,percentage 为 0.6,数量为 3 + @Test + public void testAsyncFlow14() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain14", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("3", context.getData("count").toString()); + } + + // 测试 percentage 关键字,percentage 为 0,相当于 any + @Test + public void testAsyncFlow15() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain15", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("1", context.getData("count").toString()); + } + + // 测试 percentage 关键字,percentage 为 1,相当于 all + @Test + public void testAsyncFlow16() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain16", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("5", context.getData("count").toString()); + } + + // 测试 percentage 、ignoreError 关键字 + @Test + public void testAsyncFlow17() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain17", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertEquals("2", context.getData("count").toString()); + } + + // 测试 percentage 、ignoreError 关键字 + @Test + public void testAsyncFlow18() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain18", "it's a base request"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertFalse(response.isSuccess()); + Assertions.assertEquals("2", context.getData("count").toString()); + } + } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/MCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/MCmp.java new file mode 100644 index 000000000..f12da4255 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/MCmp.java @@ -0,0 +1,26 @@ +package com.yomahub.liteflow.test.asyncNode.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("m") +public class MCmp extends NodeComponent { + + @Override + public void process() throws Exception { + String seconds = this.getTag(); + Thread.sleep((long) (1000 * Double.parseDouble(seconds))); + DefaultContext context = this.getFirstContextBean(); + synchronized (MCmp.class) { + if (context.hasData("count")) { + Integer count = context.getData("count"); + context.setData("count", ++count); + } else { + context.setData("count", 1); + } + } + System.out.println("Mcomp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/NCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/NCmp.java new file mode 100644 index 000000000..5794cfa43 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/asyncNode/cmp/NCmp.java @@ -0,0 +1,18 @@ +package com.yomahub.liteflow.test.asyncNode.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("n") +public class NCmp extends NodeComponent { + + @Override + public void process() throws Exception { + String seconds = this.getTag(); + Thread.sleep((long) (1000 * Double.parseDouble(seconds))); + // 手动抛异常 + System.out.println("Ncomp executed with exeption!"); + int a = 1 / 0; + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow1.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow1.xml index b02c48cf8..fae6b7c46 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow1.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/asyncNode/flow1.xml @@ -66,4 +66,24 @@ THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z", g, "task1", "task2"), c); + + WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(0.6f); + + + + WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(0); + + + + WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(1); + + + + WHEN(m.tag("1"), m.tag("2"), n.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).ignoreError(true).percentage(0.5); + + + + WHEN(THEN(m.tag("1"), n.tag("1.5")), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).ignoreError(false).percentage(0.4); + + \ No newline at end of file