From a27397c9d46551e4fc901467ee3d82241fb98355 Mon Sep 17 00:00:00 2001 From: jason <2353220944@qq.com> Date: Mon, 28 Oct 2024 20:01:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BC=82=E6=AD=A5=E5=BE=AA?= =?UTF-8?q?=E7=8E=AFcondition=E5=B1=82=E7=BA=A7=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../el/operator/ThreadPoolOperator.java | 30 ++++++++++++++----- .../flow/element/condition/ForCondition.java | 3 +- .../element/condition/IteratorCondition.java | 3 +- .../flow/element/condition/LoopCondition.java | 10 +++++++ .../element/condition/WhileCondition.java | 2 +- .../liteflow/thread/ExecutorHelper.java | 14 ++++++--- .../ConditionThreadPoolELSpringbootTest.java | 6 ++-- .../CustomLoopThreadExecutor.java | 23 ++++++++++++++ .../resources/chainThreadPool/flow2.el.xml | 8 ++--- 9 files changed, 77 insertions(+), 22 deletions(-) create mode 100644 liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomLoopThreadExecutor.java diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java index d6ad98b2f..0a65e27df 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/builder/el/operator/ThreadPoolOperator.java @@ -1,27 +1,43 @@ package com.yomahub.liteflow.builder.el.operator; +import com.ql.util.express.exception.QLException; import com.yomahub.liteflow.builder.el.operator.base.BaseOperator; import com.yomahub.liteflow.builder.el.operator.base.OperatorHelper; +import com.yomahub.liteflow.flow.element.Condition; +import com.yomahub.liteflow.flow.element.condition.LoopCondition; import com.yomahub.liteflow.flow.element.condition.WhenCondition; /** - * EL规则中的threadPool的操作符 + * EL规则中的threadPool的操作符 有四种用法 WHEN().threadPool() FOR...DO().threadPool() WHILE...DO.threadPool() ITERATOR...DO + * .threadPool() * * @author Bryan.Zhang * @since 2.8.0 */ -public class ThreadPoolOperator extends BaseOperator { +public class ThreadPoolOperator extends BaseOperator { @Override - public WhenCondition build(Object[] objects) throws Exception { + public Condition build(Object[] objects) throws Exception { OperatorHelper.checkObjectSizeEqTwo(objects); - String errorMsg = "The caller must be WhenCondition item"; - WhenCondition whenCondition = OperatorHelper.convert(objects[0], WhenCondition.class, errorMsg); + if (objects[0] instanceof WhenCondition) { + String errorMsg = "The caller must be WhenCondition item"; - whenCondition.setThreadExecutorClass(OperatorHelper.convert(objects[1], String.class)); + WhenCondition condition = OperatorHelper.convert(objects[0], WhenCondition.class, errorMsg); - return whenCondition; + condition.setThreadExecutorClass(OperatorHelper.convert(objects[1], String.class)); + return condition; + } else if (objects[0] instanceof LoopCondition) { + String errorMsg = "The caller must be LoopCondition item"; + + LoopCondition condition = OperatorHelper.convert(objects[0], LoopCondition.class, errorMsg); + + condition.setThreadPoolExecutorClass(OperatorHelper.convert(objects[1], String.class)); + return condition; + } else { + String errorMsg = "The caller must be LoopCondition or WhenCondition item"; + throw new QLException(errorMsg); + } } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java index f77ee4017..b8dd9429f 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/ForCondition.java @@ -77,7 +77,8 @@ public class ForCondition extends LoopCondition { //存储所有的并行执行子项的CompletableFuture List> futureList = new ArrayList<>(); //获取并行循环的线程池 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, + slotIndex); for (int i = 0; i < forCount; i++){ //提交异步任务 CompletableFuture future = diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java index f0efae8c0..0cbf7645d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/IteratorCondition.java @@ -80,7 +80,8 @@ public class IteratorCondition extends LoopCondition { //存储所有的并行执行子项的CompletableFuture List> futureList = new ArrayList<>(); //获取并行循环的线程池 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, + slotIndex); while (it.hasNext()) { Object itObj = it.next(); //提交异步任务 diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java index 5242ee839..1623e90c3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/LoopCondition.java @@ -20,6 +20,8 @@ import java.util.function.Supplier; public abstract class LoopCondition extends Condition { //判断循环是否并行执行,默认为false private boolean parallel = false; + //loop condition层级的线程池 + private String threadPoolExecutorClass; protected Executable getBreakItem() { return this.getExecutableOne(ConditionKey.BREAK_KEY); @@ -37,6 +39,14 @@ public abstract class LoopCondition extends Condition { this.addExecutable(ConditionKey.DO_KEY, executable); } + public String getThreadPoolExecutorClass() { + return threadPoolExecutorClass; + } + + public void setThreadPoolExecutorClass(String threadPoolExecutorClass) { + this.threadPoolExecutorClass = threadPoolExecutorClass; + } + protected void setLoopIndex(Executable executableItem, int index) { if (executableItem instanceof Chain) { ((Chain) executableItem).getConditionList().forEach(condition -> setLoopIndex(condition, index)); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java index 145489c10..8e109632a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/condition/WhileCondition.java @@ -61,7 +61,7 @@ public class WhileCondition extends LoopCondition { //并行循环逻辑 List> futureList = new ArrayList<>(); //获取并行循环的线程池 - ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(slotIndex); + ExecutorService parallelExecutor = ExecutorHelper.loadInstance().buildLoopParallelExecutor(this, slotIndex); while (getWhileResult(slotIndex, index)){ CompletableFuture future = CompletableFuture.supplyAsync(new LoopParallelSupplier(executableItem, this.getCurrChainId(), slotIndex, index), parallelExecutor); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index 9f8d75104..f82d3cb61 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -14,6 +14,7 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.exception.ThreadExecutorServiceCreateException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.flow.element.Chain; +import com.yomahub.liteflow.flow.element.condition.LoopCondition; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.property.LiteflowConfig; @@ -134,25 +135,30 @@ public class ExecutorHelper { } //构造并行循环的线程池 - public ExecutorService buildLoopParallelExecutor(Integer slotIndex) { + public ExecutorService buildLoopParallelExecutor(LoopCondition loopCondition, Integer slotIndex) { ExecutorService parallelExecutor; LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); //获取chain的hash String chainId = DataBus.getSlot(slotIndex).getChainId(); Chain chain = FlowBus.getChain(chainId); - //condition层级线程池 TODO + //condition层级线程池 + if (ObjectUtil.isNotEmpty(loopCondition.getThreadPoolExecutorClass())) { + parallelExecutor = getExecutorService(loopCondition.getThreadPoolExecutorClass(), + String.valueOf(loopCondition.hashCode())); - //chain层级线程池 - if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) { + } else if (ObjectUtil.isNotEmpty(chain.getThreadPoolExecutorClass())) { //chain层级线程池 parallelExecutor = getExecutorService(chain.getThreadPoolExecutorClass(), String.valueOf(chain.hashCode())); + } else { //全局线程池 parallelExecutor = getExecutorService(Optional.ofNullable(liteflowConfig.getParallelLoopExecutorClass()) .orElse(liteflowConfig.getGlobalThreadPoolExecutorClass())); + } + return parallelExecutor; } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java index 1821e4dca..6b3dbdf73 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/ConditionThreadPoolELSpringbootTest.java @@ -50,7 +50,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest { LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead")); } /** @@ -61,7 +61,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest { LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg"); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead")); } /** @@ -73,7 +73,7 @@ public class ConditionThreadPoolELSpringbootTest extends BaseTest { LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list); DefaultContext context = response1.getFirstContextBean(); Assertions.assertTrue(response1.isSuccess()); - Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead")); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomLoopThreadExecutor.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomLoopThreadExecutor.java new file mode 100644 index 000000000..63d3222e4 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomLoopThreadExecutor.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomLoopThreadExecutor implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(16, 16, + 512, "customer-loop-thead"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml index 4b280ba91..741e264ba 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml @@ -7,18 +7,16 @@ - FOR(5).parallel(true).DO(THEN(a,f - ) - ); + FOR(5).parallel(true).DO(THEN(a,f)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor"); - WHILE(z).parallel(true).DO(THEN(w,d)); + WHILE(z).parallel(true).DO(THEN(w,d)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor"); - ITERATOR(it).parallel(true).DO(THEN(a,i)); + ITERATOR(it).parallel(true).DO(THEN(a,i)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor"); \ No newline at end of file