!337 feature #ICM6TX 增加 WHEN 并行策略执行逻辑,增加 percentage 关键字

Merge pull request !337 from luoyi/issues/ICM6TX
This commit is contained in:
铂赛东
2025-08-21 11:09:27 +00:00
committed by Gitee
15 changed files with 305 additions and 18 deletions

View File

@@ -1,7 +1,10 @@
package com.yomahub.liteflow.builder.el;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.*;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.MD5;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,7 +32,10 @@ import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.util.ElRegexUtil;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
@@ -38,6 +44,7 @@ import java.util.*;
* @author Bryan.Zhang
* @author Jay li
* @author jason
* @author luo yi
* @since 2.8.0
*/
public class LiteFlowChainELBuilder {
@@ -92,6 +99,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.TAG, Object.class, new TagOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ANY, Object.class, new AnyOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MUST, Object.class, new MustOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PERCENTAGE, Object.class, new PercentageOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ID, Object.class, new IdOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.IGNORE_ERROR, Object.class, new IgnoreErrorOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.THREAD_POOL, Object.class, new ThreadPoolOperator());

View File

@@ -0,0 +1,35 @@
package com.yomahub.liteflow.builder.el.operator;
import com.ql.util.express.exception.QLException;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
/**
* EL 规则中的 percentage 的操作符
*
* @author luo yi
* @since 2.13.4
*/
public class PercentageOperator extends BaseOperator<WhenCondition> {
@Override
public WhenCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqTwo(objects);
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, "The caller must be WhenCondition item");
// 指定并行任务需要完成的阈值
Double percentage = OperatorHelper.convert2Double(objects[1]);
if (percentage > 1 || percentage < 0) {
throw new QLException("The percentage must be between 0 and 1.");
}
whenCondition.setParallelStrategy(ParallelStrategyEnum.PERCENTAGE);
whenCondition.setPercentage(percentage);
return whenCondition;
}
}

View File

@@ -17,6 +17,7 @@ import java.util.Objects;
* Operator 常用工具类
*
* @author gaibu
* @author luo yi
* @since 2.8.6
*/
public class OperatorHelper {
@@ -109,6 +110,18 @@ public class OperatorHelper {
return convert(object, clazz, errorMsg);
}
public static Double convert2Double(Object object) throws QLException {
if (object instanceof Number) {
// 对 float 特别处理,避免精度问题
if (object instanceof Float) {
// 使用字符串转换避免 float 精度损失
return Double.parseDouble(Float.toString((Float) object));
}
return ((Number) object).doubleValue();
}
throw new QLException(StrUtil.format("Unsupported type: {}, it must be numeric type.", object.getClass().getName()));
}
/**
* 转换 object 为指定的类型,自定义错误信息 如果是Node类型的则进行copy
*/

View File

@@ -45,6 +45,8 @@ public interface ChainConstant {
String MUST = "must";
String PERCENTAGE = "percentage";
String TYPE = "type";
String THEN = "THEN";

View File

@@ -22,14 +22,14 @@ import com.yomahub.liteflow.enums.ParseModeEnum;
import com.yomahub.liteflow.exception.*;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.element.Rollbackable;
import com.yomahub.liteflow.flow.entity.CmpStep;
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.lifecycle.PostProcessChainExecuteLifeCycle;
import com.yomahub.liteflow.lifecycle.impl.ChainCacheLifeCycle;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.monitor.MonitorFile;

View File

@@ -1,9 +1,6 @@
package com.yomahub.liteflow.enums;
import com.yomahub.liteflow.flow.parallel.strategy.AllOfParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.AnyOfParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.SpecifyParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.*;
/**
* 并行策略枚举类
@@ -17,7 +14,10 @@ public enum ParallelStrategyEnum {
ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class),
PERCENTAGE("percentageOf", "完整指定阈值任务", PercentageOfParallelExecutor.class);
private String strategyType;

View File

@@ -10,23 +10,20 @@ package com.yomahub.liteflow.flow.element;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.common.ChainConstant;
import com.yomahub.liteflow.enums.ExecuteableTypeEnum;
import com.yomahub.liteflow.exception.ChainEndException;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.meta.LiteflowMetaOperator;
import com.yomahub.liteflow.slot.DataBus;
import com.yomahub.liteflow.slot.Slot;
import com.yomahub.liteflow.util.ElRegexUtil;
import java.util.ArrayList;
import java.util.List;

View File

@@ -51,6 +51,9 @@ public class WhenCondition extends Condition {
// 等待时间单位
private TimeUnit maxWaitTimeUnit;
// 并发任务指定阈值,取值 0 - 1
private Double percentage;
@Override
public void executeCondition(Integer slotIndex) throws Exception {
executeAsyncCondition(slotIndex);
@@ -130,4 +133,12 @@ public class WhenCondition extends Condition {
public void setMaxWaitTimeUnit(TimeUnit maxWaitTimeUnit) {
this.maxWaitTimeUnit = maxWaitTimeUnit;
}
public Double getPercentage() {
return percentage;
}
public void setPercentage(Double percentage) {
this.percentage = percentage;
}
}

View File

@@ -0,0 +1,70 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.LongAdder;
/**
* 完成指定阈值任务
*
* @author luo yi
* @since 2.13.4
*/
public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
@Override
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
// 获取所有 CompletableFuture 任务
List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);
int total = whenAllTaskList.size();
// 计算阈值数量(向上取整),为 0 时取 1表示只等待一个完成即 any
int thresholdCount = Math.max(1, (int) Math.ceil(total * whenCondition.getPercentage()));
// 已完成任务收集器
ConcurrentLinkedQueue<CompletableFuture<WhenFutureObj>> completedFutures = new ConcurrentLinkedQueue<>();
// 阈值触发门闩
CompletableFuture<Void> thresholdFuture = new CompletableFuture<>();
// 原子计数器
LongAdder completedCount = new LongAdder();
// 为每个任务添加回调
whenAllTaskList.forEach(future ->
future.whenComplete((result, ex) -> {
// 计数 +1
completedCount.increment();
int currentCount = completedCount.intValue();
if (currentCount <= thresholdCount) {
// 添加已完成任务
completedFutures.add(future);
}
// 达到阈值时触发门闩(确保只触发一次)
if (currentCount >= thresholdCount && !thresholdFuture.isDone()) {
thresholdFuture.complete(null);
}
})
);
// 创建组合任务(仅包含已完成任务)
CompletableFuture<Void> combinedTask = thresholdFuture.thenRun(() -> {
// 达到阈值时创建 allOf 任务
CompletableFuture.allOf(completedFutures.toArray(new CompletableFuture[]{})).join();
});
// 处理结果(会阻塞直到阈值任务完成)
this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, combinedTask);
}
}

View File

@@ -1,9 +1,14 @@
package com.yomahub.liteflow.builder.el;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
/**
* 并行组件
@@ -23,6 +28,7 @@ public class WhenELWrapper extends ELWrapper {
private boolean ignoreError;
private String customThreadExecutor;
private final List<String> mustExecuteList;
private Double percentage;
public WhenELWrapper(ELWrapper... elWrappers) {
this.addWrapper(elWrappers);
@@ -41,6 +47,11 @@ public class WhenELWrapper extends ELWrapper {
return this;
}
public WhenELWrapper percentage(double percentage) {
this.percentage = percentage;
return this;
}
public WhenELWrapper ignoreError(boolean ignoreError) {
this.ignoreError = ignoreError;
return this;
@@ -110,6 +121,18 @@ public class WhenELWrapper extends ELWrapper {
@Override
protected String toEL(Integer depth, StringBuilder paramContext) {
// 互斥检查:确保三个属性中最多只有一个被设置
long count = Stream.of(
this.any ? 1 : 0,
CollectionUtil.isNotEmpty(this.mustExecuteList) ? 1 : 0,
ObjUtil.isNotNull(this.percentage) ? 1 : 0
).filter(num -> num > 0).count();
if (count > 1) {
throw new IllegalArgumentException("Properties 'any', 'must', and 'percentage' are mutually exclusive. Only one can be set at a time.");
}
Integer sonDepth = depth == null ? null : depth + 1;
StringBuilder sb = new StringBuilder();
@@ -139,10 +162,6 @@ public class WhenELWrapper extends ELWrapper {
sb.append(StrUtil.format(".threadPool(\"{}\")", customThreadExecutor));
}
if(CollectionUtil.isNotEmpty(mustExecuteList)){
// 校验must 语义与 any语义冲突
if (this.any){
throw new IllegalArgumentException("'.must()' and '.any()' can use in when component at the same time!");
}
// 处理must子表达式输出
sb.append(".must(");
for(int i = 0; i < mustExecuteList.size(); i++){
@@ -154,6 +173,10 @@ public class WhenELWrapper extends ELWrapper {
sb.append(")");
}
if (ObjUtil.isNotNull(percentage)){
sb.append(StrUtil.format(".percentage({})", percentage));
}
// 处理公共属性输出
processWrapperProperty(sb, paramContext);
return sb.toString();

View File

@@ -210,4 +210,23 @@ public class WhenELBuilderTest extends BaseTest {
WhenELWrapper el = ELBus.when(ELBus.node("a"), ELBus.node("b"), ELBus.node("c")).customThreadExecutor("com.yomahub.liteflow.test.builder.customTreadExecutor.CustomThreadExecutor1");
Assertions.assertTrue(LiteFlowChainELBuilder.validate(el.toEL()));
}
// 测试互斥属性设置
@Test
public void testWHEN1(){
WhenELWrapper el = ELBus.when(ELBus.node("a"), ELBus.node("b")).percentage(0.66).any(true);
WhenELWrapper el2 = ELBus.when(ELBus.node("a"), ELBus.node("b")).must("a").any(true);
WhenELWrapper el3 = ELBus.when(ELBus.node("a"), ELBus.node("b")).must("a").percentage(0.55);
Assertions.assertThrowsExactly(IllegalArgumentException.class, el::toEL);
Assertions.assertThrowsExactly(IllegalArgumentException.class, el2::toEL);
Assertions.assertThrowsExactly(IllegalArgumentException.class, el3::toEL);
}
// 测试 percentage 属性
@Test
public void testWHEN2(){
WhenELWrapper el = ELBus.when(ELBus.node("a"), ELBus.node("b")).percentage(0.66);
Assertions.assertTrue(LiteFlowChainELBuilder.validate(el.toEL()));
}
}

View File

@@ -184,4 +184,49 @@ public class AsyncNodeELSpringbootTest extends BaseTest {
Assertions.assertTrue(context.getData("check").toString().startsWith("akbgc"));
}
// 测试 percentage 关键字percentage 为 0.6,数量为 3
@Test
public void testAsyncFlow14() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain14", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("3", context.getData("count").toString());
}
// 测试 percentage 关键字percentage 为 0相当于 any
@Test
public void testAsyncFlow15() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain15", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("1", context.getData("count").toString());
}
// 测试 percentage 关键字percentage 为 1相当于 all
@Test
public void testAsyncFlow16() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain16", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("5", context.getData("count").toString());
}
// 测试 percentage 、ignoreError 关键字
@Test
public void testAsyncFlow17() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain17", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertEquals("2", context.getData("count").toString());
}
// 测试 percentage 、ignoreError 关键字
@Test
public void testAsyncFlow18() throws Exception {
LiteflowResponse response = flowExecutor.execute2Resp("chain18", "it's a base request");
DefaultContext context = response.getFirstContextBean();
Assertions.assertFalse(response.isSuccess());
Assertions.assertEquals("2", context.getData("count").toString());
}
}

View File

@@ -0,0 +1,26 @@
package com.yomahub.liteflow.test.asyncNode.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.springframework.stereotype.Component;
@Component("m")
public class MCmp extends NodeComponent {
@Override
public void process() throws Exception {
String seconds = this.getTag();
Thread.sleep((long) (1000 * Double.parseDouble(seconds)));
DefaultContext context = this.getFirstContextBean();
synchronized (MCmp.class) {
if (context.hasData("count")) {
Integer count = context.getData("count");
context.setData("count", ++count);
} else {
context.setData("count", 1);
}
}
System.out.println("Mcomp executed!");
}
}

View File

@@ -0,0 +1,18 @@
package com.yomahub.liteflow.test.asyncNode.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.springframework.stereotype.Component;
@Component("n")
public class NCmp extends NodeComponent {
@Override
public void process() throws Exception {
String seconds = this.getTag();
Thread.sleep((long) (1000 * Double.parseDouble(seconds)));
// 手动抛异常
System.out.println("Ncomp executed with exeption!");
int a = 1 / 0;
}
}

View File

@@ -66,4 +66,24 @@
THEN(WHEN(d, g, l, a, THEN(k, b).id("z")).ignoreError(true).must("z", g, "task1", "task2"), c);
</chain>
<chain name="chain14">
WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(0.6f);
</chain>
<chain name="chain15">
WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(0);
</chain>
<chain name="chain16">
WHEN(m.tag("1"), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).percentage(1);
</chain>
<chain name="chain17">
WHEN(m.tag("1"), m.tag("2"), n.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).ignoreError(true).percentage(0.5);
</chain>
<chain name="chain18">
WHEN(THEN(m.tag("1"), n.tag("1.5")), m.tag("2"), m.tag("3"), m.tag("4"), m.tag("5")).ignoreError(false).percentage(0.4);
</chain>
</flow>