feature #ICM6TX 增加 WHEN 并行策略执行逻辑,增加 percentage 关键字

This commit is contained in:
luoyi
2025-07-14 23:19:23 +08:00
parent b3537f77bb
commit 6fe8707178
11 changed files with 251 additions and 7 deletions

View File

@@ -1,7 +1,10 @@
package com.yomahub.liteflow.builder.el;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.*;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.CharUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ql.util.express.DefaultContext;
@@ -27,7 +30,10 @@ import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
@@ -36,6 +42,7 @@ import java.util.*;
* @author Bryan.Zhang
* @author Jay li
* @author jason
* @author luo yi
* @since 2.8.0
*/
public class LiteFlowChainELBuilder {
@@ -90,6 +97,7 @@ public class LiteFlowChainELBuilder {
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.TAG, Object.class, new TagOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ANY, Object.class, new AnyOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.MUST, Object.class, new MustOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.PERCENTAGE, Object.class, new PercentageOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.ID, Object.class, new IdOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.IGNORE_ERROR, Object.class, new IgnoreErrorOperator());
EXPRESS_RUNNER.addFunctionAndClassMethod(ChainConstant.THREAD_POOL, Object.class, new ThreadPoolOperator());

View File

@@ -0,0 +1,35 @@
package com.yomahub.liteflow.builder.el.operator;
import com.ql.util.express.exception.QLException;
import com.yomahub.liteflow.builder.el.operator.base.BaseOperator;
import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper;
import com.yomahub.liteflow.enums.ParallelStrategyEnum;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
/**
* EL 规则中的 percentage 的操作符
*
* @author luo yi
* @since 2.13.4
*/
public class PercentageOperator extends BaseOperator<WhenCondition> {
@Override
public WhenCondition build(Object[] objects) throws Exception {
OperatorHelper.checkObjectSizeEqTwo(objects);
WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, "The caller must be WhenCondition item");
// 指定并行任务需要完成的阈值
Double percentage = OperatorHelper.convert2Double(objects[1]);
if (percentage > 1 || percentage < 0) {
throw new QLException("The percentage must be between 0 and 1.");
}
whenCondition.setParallelStrategy(ParallelStrategyEnum.PERCENTAGE);
whenCondition.setPercentage(percentage);
return whenCondition;
}
}

View File

@@ -17,6 +17,7 @@ import java.util.Objects;
* Operator 常用工具类
*
* @author gaibu
* @author luo yi
* @since 2.8.6
*/
public class OperatorHelper {
@@ -109,6 +110,18 @@ public class OperatorHelper {
return convert(object, clazz, errorMsg);
}
public static Double convert2Double(Object object) throws QLException {
if (object instanceof Number) {
// 对 float 特别处理,避免精度问题
if (object instanceof Float) {
// 使用字符串转换避免 float 精度损失
return Double.parseDouble(Float.toString((Float) object));
}
return ((Number) object).doubleValue();
}
throw new QLException(StrUtil.format("Unsupported type: {}, it must be numeric type.", object.getClass().getName()));
}
/**
* 转换 object 为指定的类型,自定义错误信息 如果是Node类型的则进行copy
*/

View File

@@ -45,6 +45,8 @@ public interface ChainConstant {
String MUST = "must";
String PERCENTAGE = "percentage";
String TYPE = "type";
String THEN = "THEN";

View File

@@ -1,9 +1,6 @@
package com.yomahub.liteflow.enums;
import com.yomahub.liteflow.flow.parallel.strategy.AllOfParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.AnyOfParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.ParallelStrategyExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.SpecifyParallelExecutor;
import com.yomahub.liteflow.flow.parallel.strategy.*;
/**
* 并行策略枚举类
@@ -17,7 +14,10 @@ public enum ParallelStrategyEnum {
ALL("allOf", "完成全部任务", AllOfParallelExecutor.class),
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class);
SPECIFY("must", "完成指定 ID 任务", SpecifyParallelExecutor.class),
PERCENTAGE("percentageOf", "完整指定阈值任务", PercentageOfParallelExecutor.class);
private String strategyType;

View File

@@ -51,6 +51,9 @@ public class WhenCondition extends Condition {
// 等待时间单位
private TimeUnit maxWaitTimeUnit;
// 并发任务指定阈值,取值 0 - 1
private Double percentage;
@Override
public void executeCondition(Integer slotIndex) throws Exception {
executeAsyncCondition(slotIndex);
@@ -130,4 +133,12 @@ public class WhenCondition extends Condition {
public void setMaxWaitTimeUnit(TimeUnit maxWaitTimeUnit) {
this.maxWaitTimeUnit = maxWaitTimeUnit;
}
public Double getPercentage() {
return percentage;
}
public void setPercentage(Double percentage) {
this.percentage = percentage;
}
}

View File

@@ -0,0 +1,66 @@
package com.yomahub.liteflow.flow.parallel.strategy;
import com.yomahub.liteflow.flow.element.condition.WhenCondition;
import com.yomahub.liteflow.flow.parallel.WhenFutureObj;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 完成指定阈值任务
*
* @author luo yi
* @since 2.13.4
*/
public class PercentageOfParallelExecutor extends ParallelStrategyExecutor {
@Override
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
// 获取所有 CompletableFuture 任务
List<CompletableFuture<WhenFutureObj>> whenAllTaskList = this.getWhenAllTaskList(whenCondition, slotIndex);
int total = whenAllTaskList.size();
// 计算阈值数量(向上取整)
int thresholdCount = (int) Math.ceil(total * whenCondition.getPercentage());
// 已完成任务收集器(对 List 加锁保证线程安全)
List<CompletableFuture<WhenFutureObj>> completedFutures = Collections.synchronizedList(new ArrayList<>(thresholdCount));
// 阈值触发门闩
CompletableFuture<Void> thresholdFuture = new CompletableFuture<>();
// 原子计数器
AtomicInteger completedCount = new AtomicInteger(0);
// 为每个任务添加回调
whenAllTaskList.forEach(future ->
future.whenComplete((result, ex) -> {
// 安全添加已完成任务
completedFutures.add(future);
// 检查是否达到阈值
if (completedCount.incrementAndGet() >= thresholdCount) {
// 确保只触发一次
if (!thresholdFuture.isDone()) {
thresholdFuture.complete(null);
}
}
})
);
// 创建组合任务(仅包含已完成任务)
CompletableFuture<Void> combinedTask = thresholdFuture.thenRun(() -> {
// 达到阈值时创建 allOf 任务
CompletableFuture.allOf(completedFutures.toArray(new CompletableFuture[]{})).join();
});
// 处理结果(会阻塞直到阈值任务完成)
this.handleTaskResult(whenCondition, slotIndex, whenAllTaskList, combinedTask);
}
}