enhancement #I8YDGE 在迭代循环组件中,无法获取子流程传递的请求参数

This commit is contained in:
everywhere.z
2024-03-08 12:58:34 +08:00
parent e1f01519b8
commit a036ae00ee
2 changed files with 9 additions and 9 deletions

View File

@@ -45,16 +45,16 @@ public abstract class ParallelStrategyExecutor {
* @param executable
* @param parallelExecutor
* @param whenCondition
* @param currChainName
* @param currChainId
* @param slotIndex
* @return
*/
protected CompletableFuture<WhenFutureObj> wrappedFutureObj(Executable executable, ExecutorService parallelExecutor,
WhenCondition whenCondition, String currChainName, Integer slotIndex) {
WhenCondition whenCondition, String currChainId, Integer slotIndex) {
// 套入 CompletableFutureTimeout 方法进行超时判断,如果超时则用 WhenFutureObj.timeOut 返回超时的对象
// 第 2 个参数是主要的本体 CompletableFuture传入了 ParallelSupplier 和线程池对象
return CompletableFutureExpand.completeOnTimeout(
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor),
CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainId, slotIndex), parallelExecutor),
whenCondition.getMaxWaitTime(),
whenCondition.getMaxWaitTimeUnit(),
WhenFutureObj.timeOut(executable.getId()));
@@ -149,7 +149,7 @@ public abstract class ParallelStrategyExecutor {
*/
protected List<CompletableFuture<WhenFutureObj>> getWhenAllTaskList(WhenCondition whenCondition, Integer slotIndex) {
String currChainName = whenCondition.getCurrChainId();
String currChainId = whenCondition.getCurrChainId();
// 设置 whenCondition 参数
this.setWhenConditionParams(whenCondition);
@@ -159,8 +159,8 @@ public abstract class ParallelStrategyExecutor {
// 这里主要是做了封装 CompletableFuture 对象,用 lambda 表达式做了很多事情,这句代码要仔细理清
// 根据 condition.getNodeList() 的集合进行流处理,用 map 进行把 executable 对象转换成 List<CompletableFuture<WhenFutureObj>>
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex))
List<CompletableFuture<WhenFutureObj>> completableFutureList = filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId)
.map(executable -> wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex))
.collect(Collectors.toList());
return completableFutureList;

View File

@@ -20,7 +20,7 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
@Override
public void execute(WhenCondition whenCondition, Integer slotIndex) throws Exception {
String currChainName = whenCondition.getCurrChainId();
String currChainId = whenCondition.getCurrChainId();
// 设置 whenCondition 参数
this.setWhenConditionParams(whenCondition);
@@ -41,10 +41,10 @@ public class SpecifyParallelExecutor extends ParallelStrategyExecutor {
List<CompletableFuture<WhenFutureObj>> allTaskList = new ArrayList<>();
// 遍历 when 所有 node进行筛选及处理
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainName)
filterWhenTaskList(whenCondition.getExecutableList(), slotIndex, currChainId)
.forEach(executable -> {
// 处理 task封装成 CompletableFuture 对象
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainName, slotIndex);
CompletableFuture<WhenFutureObj> completableFutureTask = wrappedFutureObj(executable, parallelExecutor, whenCondition, currChainId, slotIndex);
// 存在 must 指定 ID 的 task且该任务只会有一个或者没有
if (whenCondition.getSpecifyIdSet().contains(executable.getId())) {
// 设置指定任务 future 对象