diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomStatefulException.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomStatefulException.java new file mode 100644 index 000000000..10cda69b6 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomStatefulException.java @@ -0,0 +1,14 @@ +package com.yomahub.liteflow.test.parallelLoop; + +import com.yomahub.liteflow.exception.LiteFlowException; + +/** + * 用户自定义带状态码的异常 + */ +public class CustomStatefulException extends LiteFlowException { + + public CustomStatefulException(String code, String message) { + super(code, message); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomThreadExecutor.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomThreadExecutor.java new file mode 100644 index 000000000..72773f38a --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/CustomThreadExecutor.java @@ -0,0 +1,23 @@ +package com.yomahub.liteflow.test.parallelLoop; + +import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.property.LiteflowConfigGetter; +import com.yomahub.liteflow.thread.ExecutorBuilder; + +import java.util.concurrent.ExecutorService; + +public class CustomThreadExecutor implements ExecutorBuilder { + + @Override + public ExecutorService buildExecutor() { + LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); + // 只有在非spring的场景下liteflowConfig才会为null + if (ObjectUtil.isNull(liteflowConfig)) { + liteflowConfig = new LiteflowConfig(); + } + return buildDefaultExecutor(liteflowConfig.getParallelMaxWorkers(), liteflowConfig.getParallelMaxWorkers(), + liteflowConfig.getParallelQueueLimit(), "customer-loop-thead-"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/ParallelLoopELSpringTest.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/ParallelLoopELSpringTest.java new file mode 100644 index 000000000..c72aa81dd --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/ParallelLoopELSpringTest.java @@ -0,0 +1,114 @@ +package com.yomahub.liteflow.test.parallelLoop; + +import cn.hutool.core.collection.ListUtil; +import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.exception.LiteFlowException; +import com.yomahub.liteflow.flow.LiteflowResponse; +import com.yomahub.liteflow.slot.DefaultContext; +import com.yomahub.liteflow.test.BaseTest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + + +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import javax.annotation.Resource; +import java.util.List; +import java.util.regex.Pattern; + +/** + * springboot环境EL异步循环测试 + * + * @author zhhhhy + * @since 2.11.0 + */ +@ExtendWith(SpringExtension.class) +@ContextConfiguration("classpath:/parallelLoop/application.xml") +public class ParallelLoopELSpringTest extends BaseTest { + + @Resource + private FlowExecutor flowExecutor; + + //测试并行FOR循环,循环次数直接在el中定义 + @Test + public void testParallelLoop1() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + + //测试并行FOR循环,循环次数由For组件定义 + @Test + public void testParallelLoop2() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + + //测试并行FOR循环中的BREAK组件能够正常发挥作用 + @Test + public void testParallelLoop3() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain3", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + + //测试并行FOR循环中,主线程是否会正常等待所有并行子项完成后再继续执行 + @Test + public void testParallelLoop4() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain4", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + + @Test + //测试并行FOR循环中,某个并行子项抛出异常 + public void testParallelLoop5() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain5", "arg"); + Assertions.assertFalse(response.isSuccess()); + Assertions.assertEquals("300", response.getCode()); + Assertions.assertNotNull(response.getCause()); + Assertions.assertTrue(response.getCause() instanceof LiteFlowException); + Assertions.assertNotNull(response.getSlot()); + } + + //并行的条件循环 + @Test + public void testParallelLoop6() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain6", "arg"); + Assertions.assertTrue(response.isSuccess()); + } + + //并行的迭代循环 + @Test + public void testParallelLoop7() throws Exception { + List list = ListUtil.toList("1", "2", "3", "4", "5"); + LiteflowResponse response = flowExecutor.execute2Resp("chain7", list); + Assertions.assertTrue(response.isSuccess()); + } + + //测试并行FOR循环中的index + @Test + public void testParallelLoop8() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain8", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + String regex = "(?!.*(.).*\\1)[0-4]{5}"; //匹配不包含重复数字的0-4的5位数字 + Pattern pattern = Pattern.compile(regex); + //e1,e2,e3分别并行执行5次,因此单个循环的顺序可以是任意的 + Assertions.assertTrue(pattern.matcher(context.getData("loop_e1")).matches()); + Assertions.assertTrue(pattern.matcher(context.getData("loop_e2")).matches()); + Assertions.assertTrue(pattern.matcher(context.getData("loop_e3")).matches()); + } + + + //测试自定义线程池配置是否生效 + @Test + public void testParallelLoop9() throws Exception { + LiteflowResponse response = flowExecutor.execute2Resp("chain9", "arg"); + DefaultContext context = response.getFirstContextBean(); + Assertions.assertTrue(response.isSuccess()); + Assertions.assertTrue(context.getData("threadName").toString().startsWith("customer-loop-thead")); + } + + + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ACmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ACmp.java new file mode 100644 index 000000000..48ac5b6ac --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ACmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("a") +public class ACmp extends NodeComponent{ + + @Override + public void process() { + System.out.println("ACmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/BCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/BCmp.java new file mode 100644 index 000000000..3a5c266f9 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/BCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("b") +public class BCmp extends NodeComponent{ + + @Override + public void process() { + System.out.println("BCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/CCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/CCmp.java new file mode 100644 index 000000000..35ac18198 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/CCmp.java @@ -0,0 +1,21 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("c") +public class CCmp extends NodeComponent{ + + @Override + public void process() { + System.out.println("CCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/DCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/DCmp.java new file mode 100644 index 000000000..affd1d340 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/DCmp.java @@ -0,0 +1,30 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("d") +public class DCmp extends NodeComponent{ + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + String key = "test"; + if (context.hasData(key)) { + int count = context.getData(key); + context.setData(key, ++count); + } + else { + context.setData(key, 1); + } + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ECmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ECmp.java new file mode 100644 index 000000000..40dbd35bb --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ECmp.java @@ -0,0 +1,33 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("e") +public class ECmp extends NodeComponent{ + + //注意与串行的ECmp相比,并行的ECmp的process方法必须保证线程安全 + @Override + public synchronized void process() { + DefaultContext context = this.getFirstContextBean(); + String key = StrUtil.format("{}_{}", "loop", this.getTag()); + if (context.hasData(key)) { + String loopStr = context.getData(key); + String loopStrReturn = StrUtil.format("{}{}", loopStr, this.getLoopIndex()); + context.setData(key, loopStrReturn); + } + else { + context.setData(key, this.getLoopIndex().toString()); + } + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/FCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/FCmp.java new file mode 100644 index 000000000..b5f10ee90 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/FCmp.java @@ -0,0 +1,20 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +@Component("f") +public class FCmp extends NodeComponent{ + + @Override + public void process() { + try { + System.out.println("FCmp start to sleep 5s"); + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + System.out.println("FCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/GCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/GCmp.java new file mode 100644 index 000000000..d135d440e --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/GCmp.java @@ -0,0 +1,18 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.test.exception.CustomStatefulException; +import org.springframework.stereotype.Component; + +@Component("g") +public class GCmp extends NodeComponent{ + + @Override + public void process() { + if(this.getLoopIndex()==1){ + throw new CustomStatefulException("300", "chain execute custom stateful execption"); + } + System.out.println("GCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/HCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/HCmp.java new file mode 100644 index 000000000..8b2fcda87 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/HCmp.java @@ -0,0 +1,18 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + + +@Component("h") +public class HCmp extends NodeComponent{ + + @Override + public void process() { + DefaultContext context = this.getFirstContextBean(); + context.setData("threadName", Thread.currentThread().getName()); + System.out.println("HCmp executed!"); + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ITCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ITCmp.java new file mode 100644 index 000000000..b11e98ab8 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ITCmp.java @@ -0,0 +1,17 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeIteratorComponent; +import org.springframework.stereotype.Component; + +import java.util.Iterator; +import java.util.List; + +@Component("it") +public class ITCmp extends NodeIteratorComponent { + + @Override + public Iterator processIterator() throws Exception { + List list = this.getRequestData(); + return list.iterator(); + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/XCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/XCmp.java new file mode 100644 index 000000000..c9cfc967b --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/XCmp.java @@ -0,0 +1,14 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeForComponent; +import org.springframework.stereotype.Component; + +@Component("x") +public class XCmp extends NodeForComponent { + + @Override + public int processFor() throws Exception { + return 3; + } + +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/YCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/YCmp.java new file mode 100644 index 000000000..8975f7f3b --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/YCmp.java @@ -0,0 +1,19 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeBreakComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("y") +public class YCmp extends NodeBreakComponent { + + @Override + public boolean processBreak() throws Exception { + DefaultContext context = this.getFirstContextBean(); + int count = 0; + if(context.hasData("test")) { + count = context.getData("test"); + } + return count > 3; + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ZCmp.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ZCmp.java new file mode 100644 index 000000000..844a6ff25 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/parallelLoop/cmp/ZCmp.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.test.parallelLoop.cmp; + +import com.yomahub.liteflow.core.NodeWhileComponent; +import com.yomahub.liteflow.slot.DefaultContext; +import org.springframework.stereotype.Component; + +@Component("z") +public class ZCmp extends NodeWhileComponent { + + @Override + public boolean processWhile() throws Exception { + DefaultContext context = this.getFirstContextBean(); + String key = "test"; + if (context.hasData(key)) { + int count = context.getData("test"); + return count < 5; + } else { + return true; + } + } +} diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/application.xml b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/application.xml new file mode 100644 index 000000000..ee28d7f85 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/application.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/flow.xml b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/flow.xml new file mode 100644 index 000000000..c68bbd7c3 --- /dev/null +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/resources/parallelLoop/flow.xml @@ -0,0 +1,46 @@ + + + + FOR(2).parallel(true).DO(THEN(a,b,c)); + + + + FOR(x).parallel(true).DO(THEN(a,b,c)); + + + + FOR(100).parallel(true).DO(THEN(a,b,d)).BREAK(y); + + + + FOR(x).parallel(true).DO(THEN(a,b,f)); + + + + FOR(x).parallel(true).DO(THEN(a,b,g)); + + + + WHILE(z).parallel(true).DO(THEN(a,d)); + + + + ITERATOR(it).parallel(true).DO(THEN(a,b)); + + + + FOR(5).parallel(true).DO( + WHEN( + THEN(a,e.tag("e1")), + THEN(c,e.tag("e2")), + THEN(b,e.tag("e3")) + ) + ); + + + + + FOR(x).parallel(true).DO(THEN(a,b,h)); + + + \ No newline at end of file