feature #IAPI07 chain维度线程池隔离

This commit is contained in:
jason
2024-10-21 21:10:08 +08:00
parent 35ea7356ff
commit f34b3e9cc5
204 changed files with 4437 additions and 378 deletions

View File

@@ -66,5 +66,9 @@
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,72 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.noear.solon.annotation.Import;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonTest;
import java.util.List;
/**
* springboot环境下chain线程池隔离测试
*/
@SolonTest
@Import(profiles = "classpath:/chainThreadPool/application.properties")
public class ChainThreadPoolELSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
/**
* 测试WHEN上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试FOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试WHILE上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
/**
* 测试ITERATOR上全局线程池和chain线程池隔离-优先以chain上为准
*/
@Test
public void testChainThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-chain-thead"));
}
}

View File

@@ -0,0 +1,73 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.noear.solon.annotation.Import;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonTest;
import java.util.List;
/**
* springboot环境下chain线程池隔离测试
*/
@SolonTest
@Import(profiles = "classpath:/chainThreadPool/application2.properties")
public class ConditionThreadPoolELSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
/**
* 测试WEHN上condition线程池和chain线程池隔离-优先以WHEN上为准
*/
@Test
public void testConditionThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-when-thead"));
}
/**
* 测试FOR上condition线程池和chain线程池隔离-优先以FOR上为准
*/
@Test
public void testConditionThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
/**
* 测试WHILE上condition线程池和chain线程池隔离-优先以WHILE上为准
*/
@Test
public void testConditionThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
/**
* 测试ITERATOR上condition线程池和chain线程池隔离-优先以ITERATOR上为准
*/
@Test
public void testConditionThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead"));
}
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomChainThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(16, 16,
512, "customer-chain-thead");
}
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomGlobalThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(16, 16,
512, "customer-global-thead");
}
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomLoopThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(16, 16,
512, "customer-loop-thead");
}
}

View File

@@ -0,0 +1,23 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.thread.ExecutorBuilder;
import java.util.concurrent.ExecutorService;
public class CustomWhenThreadExecutor implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
// 只有在非spring的场景下liteflowConfig才会为null
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(16, 16,
512, "customer-when-thead");
}
}

View File

@@ -0,0 +1,71 @@
package com.yomahub.liteflow.test.chainThreadPool;
import cn.hutool.core.collection.ListUtil;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.noear.solon.annotation.Import;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonTest;
import java.util.List;
/**
* springboot环境下Global线程池隔离测试
*/
@SolonTest
@Import(profiles = "classpath:/chainThreadPool/application3.properties")
public class GlobalThreadPoolELSpringbootTest extends BaseTest {
@Inject
private FlowExecutor flowExecutor;
/**
* 测试WHEN上全局线程池
*/
@Test
public void testGlobalThreadPool() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain1", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-global-thead"));
}
/**
* 测试FOR上全局线程池
*/
@Test
public void testGlobalThreadPool2() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain2", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-global-thead"));
}
/**
* 测试WHILE上全局线程池
*/
@Test
public void testGlobalThreadPool3() {
LiteflowResponse response1 = flowExecutor.execute2Resp("chain3", "arg");
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-global-thead"));
}
/**
* 测试ITERATOR上全局线程池
*/
@Test
public void testGlobalThreadPool4() {
List<String> list = ListUtil.toList("1", "2", "3", "4", "5");
LiteflowResponse response1 = flowExecutor.execute2Resp("chain4", list);
DefaultContext context = response1.getFirstContextBean();
Assertions.assertTrue(response1.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-global-thead"));
}
}

View File

@@ -0,0 +1,22 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import org.noear.solon.annotation.Component;
@Component("a")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}

View File

@@ -0,0 +1,25 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("b")
public class BCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("BCmp executed!");
}
}

View File

@@ -0,0 +1,30 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("d")
public class DCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData(key);
context.setData(key, ++count);
} else {
context.setData(key, 1);
}
}
}

View File

@@ -0,0 +1,25 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("f")
public class FCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("FCmp executed!");
}
}

View File

@@ -0,0 +1,25 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("i")
public class ICmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("ICmp executed!");
}
}

View File

@@ -0,0 +1,17 @@
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeIteratorComponent;
import org.noear.solon.annotation.Component;
import java.util.Iterator;
import java.util.List;
@Component("it")
public class ITCmp extends NodeIteratorComponent {
@Override
public Iterator<?> processIterator() throws Exception {
List<String> list = this.getRequestData();
return list.iterator();
}
}

View File

@@ -0,0 +1,25 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
*
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("w")
public class WCmp extends NodeComponent {
@Override
public void process() {
DefaultContext context = this.getFirstContextBean();
context.setData("threadName", Thread.currentThread().getName());
System.out.println("WCmp executed!");
}
}

View File

@@ -0,0 +1,21 @@
package com.yomahub.liteflow.test.chainThreadPool.cmp;
import com.yomahub.liteflow.core.NodeBooleanComponent;
import com.yomahub.liteflow.slot.DefaultContext;
import org.noear.solon.annotation.Component;
@Component("z")
public class ZCmp extends NodeBooleanComponent {
@Override
public boolean processBoolean() throws Exception {
DefaultContext context = this.getFirstContextBean();
String key = "test";
if (context.hasData(key)) {
int count = context.getData("test");
return count < 5;
} else {
return true;
}
}
}

View File

@@ -16,8 +16,8 @@ public class CustomThreadExecutor1 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(), "customer-when-1-thead-");
return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(),
liteflowConfig.getGlobalThreadPoolQueueSize(), "customer-when-1-thead-");
}
}

View File

@@ -16,8 +16,8 @@ public class CustomThreadExecutor2 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(), "customer-when-2-thead-");
return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(),
liteflowConfig.getGlobalThreadPoolQueueSize(), "customer-when-2-thead-");
}
}

View File

@@ -16,8 +16,8 @@ public class CustomThreadExecutor3 implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenMaxWorkers(),
liteflowConfig.getWhenQueueLimit(), "customer-when-3-thead-");
return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(),
liteflowConfig.getGlobalThreadPoolQueueSize(), "customer-when-3-thead-");
}
}

View File

@@ -6,7 +6,6 @@ import com.yomahub.liteflow.slot.DefaultContext;
import com.yomahub.liteflow.test.BaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.noear.solon.annotation.Import;
import org.noear.solon.annotation.Inject;
import org.noear.solon.test.SolonTest;
@@ -36,7 +35,7 @@ public class CustomWhenThreadPoolELSpringbootTest extends BaseTest {
LiteflowResponse response = flowExecutor.execute2Resp("chain", "arg");
DefaultContext context = response.getFirstContextBean();
Assertions.assertTrue(response.isSuccess());
Assertions.assertTrue(context.getData("threadName").toString().startsWith("when-thread-1"));
Assertions.assertTrue(context.getData("threadName").toString().startsWith("global-thread-1"));
}
/**

View File

@@ -16,8 +16,8 @@ public class CustomThreadExecutor implements ExecutorBuilder {
if (ObjectUtil.isNull(liteflowConfig)) {
liteflowConfig = new LiteflowConfig();
}
return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(),
liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-");
return buildDefaultExecutor(liteflowConfig.getGlobalThreadPoolSize(), liteflowConfig.getGlobalThreadPoolSize(),
liteflowConfig.getGlobalThreadPoolQueueSize(), "customer-loop-thead-");
}
}

View File

@@ -0,0 +1 @@
liteflow.rule-source=chainThreadPool/flow.el.xml

View File

@@ -0,0 +1,2 @@
liteflow.rule-source=chainThreadPool/flow2.el.xml
liteflow.when-thread-pool-isolate=true

View File

@@ -0,0 +1,4 @@
liteflow.rule-source=chainThreadPool/flow3.el.xml
liteflow.global-thread-pool-size=16
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.test.chainThreadPool.CustomGlobalThreadExecutor

View File

@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHEN(a,b);
</chain>
<chain name="chain2"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
FOR(5).parallel(true).DO(THEN(a,f));
</chain>
<chain name="chain3"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHILE(z).parallel(true).DO(THEN(w,d));
</chain>
<chain name="chain4"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
ITERATOR(it).parallel(true).DO(THEN(a,i));
</chain>
</flow>

View File

@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHEN(a,b).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomWhenThreadExecutor");
</chain>
<chain name="chain2"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
FOR(5).parallel(true).DO(THEN(a,f)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
</chain>
<chain name="chain3"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
WHILE(z).parallel(true).DO(THEN(w,d)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
</chain>
<chain name="chain4"
thread-pool-executor-class="com.yomahub.liteflow.test.chainThreadPool.CustomChainThreadExecutor">
ITERATOR(it).parallel(true).DO(THEN(a,i)).threadPool("com.yomahub.liteflow.test.chainThreadPool.CustomLoopThreadExecutor");
</chain>
</flow>

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
WHEN(a,b);
</chain>
<chain name="chain2">
FOR(5).parallel(true).DO(THEN(a,f)
);
</chain>
<chain name="chain3">
WHILE(z).parallel(true).DO(THEN(w,d));
</chain>
<chain name="chain4">
ITERATOR(it).parallel(true).DO(THEN(a,i));
</chain>
</flow>

View File

@@ -1,4 +1,4 @@
liteflow.rule-source=parallelLoop/flow.xml
liteflow.parallel-max-workers = 10
liteflow.parallel-queue-limit = 1024
liteflow.parallel-loop-executor-class =com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor
liteflow.global-thread-pool-size=16
liteflow.global-thread-pool-queue-size=512
liteflow.global-thread-pool-executor-class=com.yomahub.liteflow.test.parallelLoop.CustomThreadExecutor