diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java index bbdd44c41..109055a53 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/parallel/strategy/ParallelStrategyExecutor.java @@ -140,7 +140,7 @@ public abstract class ParallelStrategyExecutor { String chainId = DataBus.getSlot(slotIndex).getChainId(); Chain chain = FlowBus.getChain(chainId); parallelExecutor = - ExecutorHelper.loadInstance().buildWhenExecutorWithHash(whenCondition.getThreadExecutorClass(), + ExecutorHelper.loadInstance().buildChainExecutorWithHash(whenCondition.getThreadExecutorClass(), String.valueOf(chain.hashCode())); } else { parallelExecutor = ExecutorHelper.loadInstance().buildWhenExecutor(whenCondition.getThreadExecutorClass()); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index c39ed27bc..5ddb3d2e0 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -76,6 +76,12 @@ public class LiteflowConfig { // 异步线程池最大队列数量 private Integer whenQueueLimit; + // chain线程池最大线程数 + private Integer chainMaxWorkers; + + // chain线程池最大队列数量 + private Integer chainQueueLimit; + // 解析模式,一共有三种,具体看其定义 private ParseModeEnum parseMode; @@ -250,7 +256,6 @@ public class LiteflowConfig { public void setWhenMaxWorkers(Integer whenMaxWorkers) { this.whenMaxWorkers = whenMaxWorkers; } - public Integer getWhenQueueLimit() { if (ObjectUtil.isNull(whenQueueLimit)) { return 512; @@ -326,8 +331,8 @@ public class LiteflowConfig { } } - public void setChainThreadExecutorClass(String threadExecutorClass) { - this.threadExecutorClass = threadExecutorClass; + public void setChainThreadExecutorClass(String chainThreadExecutorClass) { + this.chainThreadExecutorClass = chainThreadExecutorClass; } public String getNodeExecutorClass() { @@ -540,4 +545,29 @@ public class LiteflowConfig { public void setChainThreadPoolIsolate(Boolean chainThreadPoolIsolate) { this.chainThreadPoolIsolate = chainThreadPoolIsolate; } + + public Integer getChainMaxWorkers() { + if (ObjectUtil.isNull(chainMaxWorkers)) { + return 16; + } else { + return chainMaxWorkers; + } + } + + public void setChainMaxWorkers(Integer chainMaxWorkers) { + this.chainMaxWorkers = chainMaxWorkers; + } + + public Integer getChainQueueLimit() { + if (ObjectUtil.isNull(chainMaxWorkers)) { + return 512; + } else { + return chainQueueLimit; + } + } + + public void setChainQueueLimit(Integer chainQueueLimit) { + this.chainQueueLimit = chainQueueLimit; + } + } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java index b283e11e5..f0a4a27d8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/ExecutorHelper.java @@ -134,11 +134,11 @@ public class ExecutorHelper { public ExecutorService buildLoopParallelExecutor(Integer slotIndex) { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); //chain线程池 - if (BooleanUtil.isTrue(liteflowConfig.getWhenThreadPoolIsolate())) { + if (BooleanUtil.isTrue(liteflowConfig.getChainThreadPoolIsolate())) { //获取chain的hash String chainId = DataBus.getSlot(slotIndex).getChainId(); Chain chain = FlowBus.getChain(chainId); - return getExecutorService(liteflowConfig.getThreadExecutorClass(), String.valueOf(chain.hashCode())); + return getExecutorService(liteflowConfig.getChainThreadExecutorClass(), String.valueOf(chain.hashCode())); } return getExecutorService(liteflowConfig.getParallelLoopExecutorClass()); } @@ -183,26 +183,19 @@ public class ExecutorHelper { } } - // 构建when线程池 - clazz和condition的hash值共同作为缓存key + // 构建chain线程池 - clazz和condition的hash值共同作为缓存key public ExecutorService buildChainExecutorWithHash(String conditionHash) { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); - return buildChainExecutorWithHash(liteflowConfig.getThreadExecutorClass(), conditionHash); + return buildChainExecutorWithHash(liteflowConfig.getChainThreadExecutorClass(), conditionHash); } - // 构建when线程池 - clazz和condition的hash值共同作为缓存key + // 构建chain线程池 - clazz和condition的hash值共同作为缓存key public ExecutorService buildChainExecutorWithHash(String clazz, String conditionHash) { if (StrUtil.isBlank(clazz)) { - return buildWhenExecutorWithHash(conditionHash); + return buildChainExecutorWithHash(conditionHash); } return getExecutorService(clazz, conditionHash); } - // 构建when线程池 - 支持多个when公用一个线程池 - public ExecutorService buildChainExecutor(String clazz) { - if (StrUtil.isBlank(clazz)) { - return buildWhenExecutor(); - } - return getExecutorService(clazz); - } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java index 45f0a517b..3148a08cf 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/thread/LiteFlowDefaultChainExecutorBuilder.java @@ -7,10 +7,7 @@ import com.yomahub.liteflow.property.LiteflowConfigGetter; import java.util.concurrent.ExecutorService; /** - * LiteFlow默认的并行多线程执行器实现 - * - * @author Bryan.Zhang - * @since 2.6.6 + * LiteFlow默认的chain多线程执行器实现 */ public class LiteFlowDefaultChainExecutorBuilder implements ExecutorBuilder { @@ -21,8 +18,8 @@ public class LiteFlowDefaultChainExecutorBuilder implements ExecutorBuilder { if (ObjectUtil.isNull(liteflowConfig)) { liteflowConfig = new LiteflowConfig(); } - return buildDefaultExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenMaxWorkers(), - liteflowConfig.getWhenQueueLimit(), "chain-thread-"); + return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(), + liteflowConfig.getChainQueueLimit(), "chain-thread-"); } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index c829fd2a9..34e3c434d 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -105,6 +105,15 @@ public class LiteflowProperty { // 每一个chain里的when和异步循环合并起来都用单独的线程池。也就是说定义了多少个chain,就有多少个线程池 private Boolean chainThreadPoolIsolate; + // chain线程池最大线程数 + private int chainMaxWorkers; + + // chain线程池最大队列数量 + private int chainQueueLimit; + + // chain线程执行器class路径 + private String chainThreadExecutorClass; + public boolean isEnableMonitorFile() { return enableMonitorFile; } @@ -348,4 +357,29 @@ public class LiteflowProperty { public boolean isChainThreadPoolIsolate() { return chainThreadPoolIsolate; } + + public int getChainMaxWorkers() { + return chainMaxWorkers; + } + + public void setChainMaxWorkers(int chainMaxWorkers) { + this.chainMaxWorkers = chainMaxWorkers; + } + + public int getChainQueueLimit() { + return chainQueueLimit; + } + + public void setChainQueueLimit(int chainQueueLimit) { + this.chainQueueLimit = chainQueueLimit; + } + + public String getChainThreadExecutorClass() { + return chainThreadExecutorClass; + } + + public void setChainThreadExecutorClass(String chainThreadExecutorClass) { + this.chainThreadExecutorClass = chainThreadExecutorClass; + } + } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java index a4dc8ef10..4976f0eca 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/config/LiteflowPropertyAutoConfiguration.java @@ -55,6 +55,9 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); liteflowConfig.setScriptSetting(property.getScriptSetting()); liteflowConfig.setChainThreadPoolIsolate(property.isChainThreadPoolIsolate()); + liteflowConfig.setChainThreadExecutorClass(property.getChainThreadExecutorClass()); + liteflowConfig.setChainMaxWorkers(property.getChainMaxWorkers()); + liteflowConfig.setChainQueueLimit(property.getChainQueueLimit()); return liteflowConfig; } diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index 83db19650..f97867ad5 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -9,6 +9,8 @@ liteflow.when-max-wait-time=15000 liteflow.when-max-wait-time-unit=MILLISECONDS liteflow.when-max-workers=16 liteflow.when-queue-limit=512 +liteflow.chain-max-workers=16 +liteflow.chain-queue-limit=512 liteflow.when-thread-pool-isolate=false liteflow.chain-thread-pool-isolate=false liteflow.parse-mode=PARSE_ALL_ON_START diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomChainThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomChainThreadPoolELSpringbootTest.java new file mode 100644 index 000000000..b98d4bbda --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomChainThreadPoolELSpringbootTest.java @@ -0,0 +1,92 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.collection.ListUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; + +import javax.annotation.Resource; +import java.util.List; + +/** + * springboot环境下chain线程池隔离测试 + */ +@TestPropertySource(value = "classpath:/chainThreadPool/application2.properties") +@SpringBootTest(classes = CustomChainThreadPoolELSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"}) +public class CustomChainThreadPoolELSpringbootTest extends BaseTest { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Resource + private FlowExecutor flowExecutor; + + /** + * 测试chain自定义线程池隔离 + */ + @Test + public void testCustomChainThreadPool1() { + LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("customer-chain-thead-1")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead-1")); + } + + /** + * 测试when上自定义线程池和chain线程池隔离-优先以when上为准 + */ + @Test + public void testCustomChainThreadPool2() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead-2")); + } + + /** + * 测试并行FOR循环全局线程池和chain线程池隔离-优先以chain线程池上为准 + */ + @Test + public void testCustomChainThreadPool3() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("customer-chain-thead-1")); + } + + /** + * 测试并行条件循环全局线程池和chain线程池隔离-优先以chain线程池上为准 + */ + @Test + public void testCustomChainThreadPool4() { + LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", "arg"); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadNameWhile").toString().startsWith("customer-chain-thead-1")); + } + + /** + * 测试并行迭代循环全局线程池和chain线程池隔离-优先以chain线程池上为准 + */ + @Test + public void testCustomChainThreadPool5() { + List list = ListUtil.toList("1", "2", "3", "4", "5"); + LiteflowResponse response1 = flowExecutor.execute2Resp("chain5", list); + DefaultContext context = response1.getFirstContextBean(); + Assertions.assertTrue(response1.isSuccess()); + Assertions.assertTrue(context.getData("threadNameIterator").toString().startsWith("customer-chain-thead-1")); + } + + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor1.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor1.java new file mode 100644 index 000000000..f19c37ec4 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor1.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor1 implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(), + liteflowConfig.getChainQueueLimit(), "customer-chain-thead-1"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor2.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor2.java new file mode 100644 index 000000000..4527fbd16 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/CustomThreadExecutor2.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor2 implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getChainMaxWorkers(), liteflowConfig.getChainMaxWorkers(), + liteflowConfig.getChainQueueLimit(), "customer-chain-thead-2"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/DefaultChainThreadPoolELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/DefaultChainThreadPoolELSpringbootTest.java new file mode 100644 index 000000000..5c31217aa --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/DefaultChainThreadPoolELSpringbootTest.java @@ -0,0 +1,45 @@ +package com.yomahub.liteflow.test.chainThreadPool; + +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.context.TestPropertySource; + +import javax.annotation.Resource; + +/** + * springboot环境下chain线程池隔离测试 + */ +@TestPropertySource(value = "classpath:/chainThreadPool/application.properties") +@SpringBootTest(classes = DefaultChainThreadPoolELSpringbootTest.class) +@EnableAutoConfiguration +@ComponentScan({"com.yomahub.liteflow.test.chainThreadPool.cmp"}) +public class DefaultChainThreadPoolELSpringbootTest extends BaseTest { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Resource + private FlowExecutor flowExecutor; + + /** + * 测试chain默认线程池隔离 + */ + @Test + public void testDefaultChainThreadPool() { + LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("threadNameFor").toString().startsWith("chain-thread-")); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("chain-thread-")); + + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ACmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ACmp.java new file mode 100644 index 000000000..774bb1b38 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ACmp.java @@ -0,0 +1,22 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("a") +public class ACmp extends NodeComponent { + + @Override + public void process() { + System.out.println("ACmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/BCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/BCmp.java new file mode 100644 index 000000000..f15e585b7 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/BCmp.java @@ -0,0 +1,25 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("b") +public class BCmp extends NodeComponent { + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + context.setData("threadName", Thread.currentThread().getName()); + System.out.println("BCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/DCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/DCmp.java new file mode 100644 index 000000000..0921b17b3 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/DCmp.java @@ -0,0 +1,30 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("d") +public class DCmp extends NodeComponent { + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + String key = "test"; + if (context.hasData(key)) { + int count = context.getData(key); + context.setData(key, ++count); + } else { + context.setData(key, 1); + } + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/FCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/FCmp.java new file mode 100644 index 000000000..3e48aac9f --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/FCmp.java @@ -0,0 +1,25 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("f") +public class FCmp extends NodeComponent { + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + context.setData("threadNameFor", Thread.currentThread().getName()); + System.out.println("FCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ICmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ICmp.java new file mode 100644 index 000000000..25424c581 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ICmp.java @@ -0,0 +1,25 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("i") +public class ICmp extends NodeComponent { + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + context.setData("threadNameIterator", Thread.currentThread().getName()); + System.out.println("ICmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ITCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ITCmp.java new file mode 100644 index 000000000..6a20f11d7 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ITCmp.java @@ -0,0 +1,17 @@ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeIteratorComponent; +import org.springframework.stereotype.Component; + +import java.util.Iterator; +import java.util.List; + +@Component("it") +public class ITCmp extends NodeIteratorComponent { + + @Override + public Iterator processIterator() throws Exception { + List list = this.getRequestData(); + return list.iterator(); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/WCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/WCmp.java new file mode 100644 index 000000000..8c394ddb5 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/WCmp.java @@ -0,0 +1,25 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * + * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("w") +public class WCmp extends NodeComponent { + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + context.setData("threadNameWhile", Thread.currentThread().getName()); + System.out.println("WCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ZCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ZCmp.java new file mode 100644 index 000000000..4c608f1c0 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/chainThreadPool/cmp/ZCmp.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.test.chainThreadPool.cmp; + +import com.yomahub.liteflow.core.NodeBooleanComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("z") +public class ZCmp extends NodeBooleanComponent { + + @Override + public boolean processBoolean() throws Exception { + DefaultContext context = this.getFirstContextBean(); + String key = "test"; + if (context.hasData(key)) { + int count = context.getData("test"); + return count < 5; + } else { + return true; + } + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application.properties b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application.properties new file mode 100644 index 000000000..587e96142 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application.properties @@ -0,0 +1,4 @@ +liteflow.rule-source=chainThreadPool/flow.el.xml +liteflow.chain-thread-pool-isolate=true +liteflow.chain-max-workers=10 +liteflow.chain-queue-limit=1024 diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application2.properties b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application2.properties new file mode 100644 index 000000000..7a2dc1480 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/application2.properties @@ -0,0 +1,5 @@ +liteflow.rule-source=chainThreadPool/flow2.el.xml +liteflow.chain-thread-pool-isolate=true +liteflow.chain-max-workers=10 +liteflow.chain-queue-limit=1024 +liteflow.chain-thread-executor-class=com.yomahub.liteflow.test.chainThreadPool.CustomThreadExecutor1 diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow.el.xml new file mode 100644 index 000000000..f15a3b45f --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow.el.xml @@ -0,0 +1,9 @@ + + + + FOR(5).parallel(true).DO(THEN(f,WHEN( + THEN(a,b) + )) + ); + + \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml new file mode 100644 index 000000000..c8a40aaaf --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/resources/chainThreadPool/flow2.el.xml @@ -0,0 +1,27 @@ + + + + FOR(5).parallel(true).DO(THEN(f,WHEN( + THEN(a,b) + )) + ); + + + + WHEN(a, b).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomThreadExecutor2"); + + + + FOR(5).parallel(true).DO(THEN(a,f + ) + ); + + + + WHILE(z).parallel(true).DO(THEN(w,d)); + + + + ITERATOR(it).parallel(true).DO(THEN(a,i)); + + \ No newline at end of file