From 47fe94b2b4d68879f57dc109aaf0654f5ec831f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BE=90=E4=BD=B3?= Date: Thu, 25 Mar 2021 17:22:45 +0800 Subject: [PATCH] =?UTF-8?q?enhancement=20#I37QVR=20WhenCondition=E6=97=B6?= =?UTF-8?q?=E5=80=99=EF=BC=8C=E5=B9=B6=E5=8F=91=E6=89=A7=E8=A1=8C=E7=9B=AE?= =?UTF-8?q?=E5=89=8D=E4=BC=9A=E6=AF=8F=E6=AC=A1=E6=96=B0=E5=BB=BA=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=8F=AF=E4=B8=8D=E5=8F=AF=E8=B5=B0=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../yomahub/liteflow/core/FlowExecutor.java | 16 +++ .../yomahub/liteflow/entity/flow/Chain.java | 59 +++++++- ...tionThread.java => ParallelCondition.java} | 17 ++- .../liteflow/entity/flow/WhenCondition.java | 12 ++ .../liteflow/parser/XmlFlowParser.java | 11 +- .../yomahub/liteflow/util/ExecutorHelper.java | 94 ++++++++++++ .../LiteflowExecutorAutoConfiguration.java | 38 +++++ .../LiteflowMainAutoConfiguration.java | 6 +- .../yomahub/liteflow/springboot/Shutdown.java | 37 +++++ .../main/resources/META-INF/spring.factories | 1 + .../flowtest/concurrent/ConcurrentCase.java | 135 ++++++++++++++++++ .../flowtest/concurrent/SpringBootApp.java | 24 ++++ .../flowtest/concurrent/TestParseFlow.java | 85 +++++++++++ .../flowtest/concurrent/TestRunFlow.java | 81 +++++++++++ .../mock/component/c/C10Component.java | 26 ++++ .../mock/component/c/C1Component.java | 26 ++++ .../mock/component/c/C2Component.java | 26 ++++ .../mock/component/c/C3Component.java | 26 ++++ .../mock/component/c/C4Component.java | 26 ++++ .../mock/component/c/C5Component.java | 26 ++++ .../mock/component/c/C6Component.java | 26 ++++ .../mock/component/c/C7Component.java | 26 ++++ .../mock/component/c/C8Component.java | 26 ++++ .../mock/component/c/C9Component.java | 26 ++++ .../mock/component/p/P3Component.java | 25 ++++ .../mock/component/p/P4Component.java | 25 ++++ .../mock/component/p/P5Component.java | 25 ++++ .../mock/component/p/P6Component.java | 25 ++++ .../mock/component/p/P7Component.java | 25 ++++ .../mock/component/p/P8Component.java | 25 ++++ .../mock/component/s/S1Component.java | 25 ++++ .../mock/component/s/S2Component.java | 25 ++++ .../mock/component/s/S3Component.java | 25 ++++ .../mock/component/s/S4Component.java | 25 ++++ .../mock/component/s/S5Component.java | 25 ++++ .../mock/component/s/S6Component.java | 25 ++++ .../src/test/resources/application-test.yml | 18 +++ .../src/test/resources/config/flow-test.xml | 20 +++ 38 files changed, 1200 insertions(+), 14 deletions(-) rename liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/{WhenConditionThread.java => ParallelCondition.java} (54%) create mode 100644 liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java create mode 100644 liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java create mode 100644 liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java create mode 100644 liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java create mode 100644 liteflow-test-springboot/src/test/resources/application-test.yml create mode 100644 liteflow-test-springboot/src/test/resources/config/flow-test.xml diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 6a9ff0bd2..fc8a09c4a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -33,6 +33,8 @@ import com.yomahub.liteflow.parser.LocalXmlFlowParser; import com.yomahub.liteflow.parser.XmlFlowParser; import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser; +import java.util.concurrent.ExecutorService; + /** * 流程规则主要执行器类 * @author Bryan.Zhang @@ -43,6 +45,8 @@ public class FlowExecutor { private LiteflowConfig liteflowConfig; + private ExecutorService parallelExecutor; + private String zkNode; //FlowExecutor的初始化化方式,主要用于parse规则文件 @@ -126,6 +130,10 @@ public class FlowExecutor { throw new ChainNotFoundException(errorMsg); } + if (parallelExecutor != null) { + chain.setParallelExecutor(parallelExecutor); + } + if(!isInnerChain && slotIndex == null) { slotIndex = DataBus.offerSlot(slotClazz); LOG.info("slot[{}] offered",slotIndex); @@ -183,4 +191,12 @@ public class FlowExecutor { public void setLiteflowConfig(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; } + + public ExecutorService getParallelExecutor() { + return parallelExecutor; + } + + public void setParallelExecutor(ExecutorService parallelExecutor) { + this.parallelExecutor = parallelExecutor; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 320b10ee1..d637428d8 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -23,6 +23,9 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.concurrent.*; + /** * chain对象,实现可执行器 * @author Bryan.Zhang @@ -37,6 +40,8 @@ public class Chain implements Executable { private static int whenMaxWaitSeconds; + private ExecutorService parallelExecutor; + static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); if (ObjectUtil.isNotNull(liteflowConfig)) { @@ -51,6 +56,14 @@ public class Chain implements Executable { this.conditionList = conditionList; } + public ExecutorService getParallelExecutor() { + return parallelExecutor; + } + + public void setParallelExecutor(ExecutorService parallelExecutor) { + this.parallelExecutor = parallelExecutor; + } + public List getConditionList() { return conditionList; } @@ -88,11 +101,18 @@ public class Chain implements Executable { } } } else if (condition instanceof WhenCondition) { - final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); - for (Executable executableItem : condition.getNodeList()) { - new WhenConditionThread(executableItem, slotIndex, slot.getRequestId(), latch).start(); + /** + for (Executable executableItem : condition.getNodeList()) { + * 设置了线程池且当前condition isSync = true时,使用异步线程池执行 + new WhenConditionThread(executableItem, slotIndex, slot.getRequestId(), latch).start(); + */ + if (((WhenCondition) condition).isASync() && parallelExecutor != null) { + executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId()); + } else { + for (Executable executableItem : condition.getNodeList()) { + executableItem.execute(slotIndex); + } } - latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS); } } } @@ -106,4 +126,35 @@ public class Chain implements Executable { public String getExecuteName() { return chainName; } + + + /** + * 使用线程池执行并发流程 + * @param condition + * @param slotIndex + * @param requestId + */ + private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { + final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); + final List> futures = new ArrayList<>(condition.getNodeList().size()); + + for (int i = 0; i < condition.getNodeList().size(); i++) { + futures.add(parallelExecutor.submit( + new ParallelCondition(condition.getNodeList().get(i), slotIndex, requestId, latch) + )); + } + + try { + if (!latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS)) { + for (Future f : futures) { + f.cancel(true); + } + } + LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled and move to next executableItem." + , requestId); + } catch (InterruptedException e) { + // ignore InterruptedException + + } + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java similarity index 54% rename from liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java rename to liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java index bc42440d0..cbfe0e1e3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCondition.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.entity.flow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -9,9 +10,9 @@ import java.util.concurrent.CountDownLatch; * 并行器线程 * @author Bryan.Zhang */ -public class WhenConditionThread extends Thread { +public class ParallelCondition implements Callable { - private static final Logger LOG = LoggerFactory.getLogger(WhenConditionThread.class); + private static final Logger LOG = LoggerFactory.getLogger(ParallelCondition.class); private Executable executableItem; @@ -21,7 +22,7 @@ public class WhenConditionThread extends Thread { private CountDownLatch latch; - public WhenConditionThread(Executable executableItem,Integer slotIndex,String requestId,CountDownLatch latch){ + public ParallelCondition(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { this.executableItem = executableItem; this.slotIndex = slotIndex; this.requestId = requestId; @@ -29,13 +30,15 @@ public class WhenConditionThread extends Thread { } @Override - public void run() { - try{ + public Boolean call() throws Exception { + try { executableItem.execute(slotIndex); }catch(Exception e){ - LOG.error("item [{}] execute cause error",executableItem.getExecuteName(),e); - }finally{ + LOG.error("requestId [{}], item [{}] execute cause error", requestId, executableItem.getExecuteName(), e); + } finally { latch.countDown(); } + + return true; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java index fbcd9715e..da80489a5 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java @@ -14,9 +14,21 @@ import java.util.List; * @author Bryan.Zhang */ public class WhenCondition extends Condition{ + /** + * 增加isSync属性,以区分是循序执行还是并发执行 + */ + private boolean isASync; public WhenCondition(List nodeList) { super(nodeList); + isASync = true; + } + public WhenCondition(List nodeList, boolean isASync) { + super(nodeList); + this.isASync = isASync; } + public boolean isASync() { + return isASync; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index 372524838..606c5cddf 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.spring.ComponentScaner; +import org.dom4j.Attribute; /** * xml形式的解析器 @@ -139,7 +140,15 @@ public abstract class XmlFlowParser { if (condE.getName().equals("then")) { conditionList.add(new ThenCondition(chainNodeList)); } else if (condE.getName().equals("when")) { - conditionList.add(new WhenCondition(chainNodeList)); + /** + * 设置是否为async异步 + */ + Attribute isSync = condE.attribute("async"); + if (isSync != null) { + conditionList.add(new WhenCondition(chainNodeList, isSync.getValue().equals("true"))); + } else { + conditionList.add(new WhenCondition(chainNodeList)); + } } } FlowBus.addChain(chainName, new Chain(chainName,conditionList)); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java new file mode 100644 index 000000000..7438ea303 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java @@ -0,0 +1,94 @@ +package com.yomahub.liteflow.util; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +/** + * desc : + * name : ExecutorHelper + * + * @author : xujia + * date : 2021/3/24 + * @since : 1.8 + */ +public class ExecutorHelper { + + private ExecutorHelper() { + } + + /** + * 使用默认的等待时间1分钟,来关闭目标线程组。 + *

+ * + * @param pool 需要关闭的线程组. + */ + public static void shutdownAwaitTermination(ExecutorService pool) { + shutdownAwaitTermination(pool, 60L); + } + + /** + * 关闭ExecutorService的线程管理者 + *

+ * + * @param pool 需要关闭的管理者 + * @param timeout 等待时间 + */ + public static void shutdownAwaitTermination(ExecutorService pool, + long timeout) { + pool.shutdown(); + try { + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + pool.shutdownNow(); + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate."); + } + } + } catch (InterruptedException ie) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * 创建的线程都是非后台线程. + * + * @param name 名称. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name) { + return buildExecutorFactory(name, false); + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * + * @param name 名称. + * @param daemon 是否为后台线程. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) { + return new ThreadFactory() { + + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName(name + "-" + number.getAndIncrement()); + newThread.setDaemon(daemon); + return newThread; + } + + }; + } + + public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) { + return new ThreadPoolExecutor(worker, worker, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queue), + buildExecutorFactory(namePrefix, daemon), + new ThreadPoolExecutor.AbortPolicy() + ); + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java new file mode 100644 index 000000000..1a337697f --- /dev/null +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java @@ -0,0 +1,38 @@ +package com.yomahub.liteflow.springboot; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; + +import static com.yomahub.liteflow.util.ExecutorHelper.buildExecutor; + +/** + * desc : + * name : LiteflowExecutorAutoConfiguration + * + * @author : xujia + * date : 2021/3/24 + * @since : 1.8 + */ +@Configuration +public class LiteflowExecutorAutoConfiguration { + + @Bean("parallelExecutor") + public ExecutorService parallelExecutor( + @Value("${threadPool.parallel.worker:0}") int worker, + @Value("${threadPool.parallel.queue:512}") int queue) { + int useWorker = worker; + int useQueue = queue; + if (useWorker == 0) { + useWorker = Runtime.getRuntime().availableProcessors() + 1; + } + + if (useQueue < 512) { + useQueue = 512; + } + + return buildExecutor(useWorker, useQueue, "parallel-executors", false); + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java index 3fe8a8a69..ff64e6166 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java @@ -20,6 +20,7 @@ import org.springframework.context.annotation.PropertySource; import javax.swing.*; import java.util.List; +import java.util.concurrent.ExecutorService; /** * 主要的业务装配器 @@ -29,15 +30,16 @@ import java.util.List; */ @Configuration @ConditionalOnBean(LiteflowConfig.class) -@AutoConfigureAfter(LiteflowPropertyAutoConfiguration.class) +@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class, LiteflowExecutorAutoConfiguration.class}) @Import(SpringAware.class) public class LiteflowMainAutoConfiguration { @Bean - public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig){ + public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig, ExecutorService parallelExecutor){ if(StrUtil.isNotBlank(liteflowConfig.getRuleSource())){ FlowExecutor flowExecutor = new FlowExecutor(); flowExecutor.setLiteflowConfig(liteflowConfig); + flowExecutor.setParallelExecutor(parallelExecutor); return flowExecutor; }else{ return null; diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java new file mode 100644 index 000000000..07096eabb --- /dev/null +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/Shutdown.java @@ -0,0 +1,37 @@ +package com.yomahub.liteflow.springboot; + +import com.yomahub.liteflow.util.ExecutorHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import javax.annotation.Resource; +import java.util.concurrent.ExecutorService; + +/** + * desc : + * name : Shutdown + * + * @author : xujia + * date : 2021/3/24 + * @since : 1.8 + */ +@Order(Integer.MIN_VALUE) +@Component +public class Shutdown { + + private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class); + + @Resource(name = "parallelExecutor") + private ExecutorService parallelExecutor; + + @PreDestroy + public void destroy() throws Exception { + LOG.info("Start closing the parallel-executors..."); + ExecutorHelper.shutdownAwaitTermination(parallelExecutor, 3600); + LOG.info("Succeed closing the parallel-executors ok..."); + } + +} diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories index 23669427e..85b579284 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -1,6 +1,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.yomahub.liteflow.springboot.LiteflowComponentScannerAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowPropertyAutoConfiguration,\ + com.yomahub.liteflow.springboot.LiteflowExecutorAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowMainAutoConfiguration diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java new file mode 100644 index 000000000..9bd1fa315 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java @@ -0,0 +1,135 @@ +package com.yomahub.flowtest.concurrent; + +import org.junit.Assert; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + + +/** + * desc : + * name : ConcurrentCase + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +public class ConcurrentCase { + public static final Map, List>> CASES = new ConcurrentHashMap<>(); + + /** + * 初始化一个测试用例的预期 + * @param request + * @param expected + */ + public static void caseInit(String request, List expected) { + CASES.put(request, new AbstractMap.SimpleEntry, List>(expected, new CopyOnWriteArrayList<>())); + } + + /** + * 添加这个测试用例的实际 + * @param request + * @param actual + */ + public static void caseAdd(String request, Routers actual) { + CASES.computeIfPresent(request, (k, v) -> { + v.getValue().add(actual); + return v; + }); + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssert(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey(), ca.getValue()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssertRandom(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey().size(), ca.getValue().size()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + boolean find = false; + for(Routers routers : ca.getKey()) { + if (routers.getValue().equals(actual.getValue())) { + find = true; + } + } + Assert.assertTrue(find); + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + + public static class Routers { + int index; + String value; + + public Routers(String value) { + this.index = -1; + this.value = value; + } + public Routers(int index, String value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return index; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Routers routers = (Routers) o; + return value.equals(routers.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java new file mode 100644 index 000000000..2c113b5f0 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java @@ -0,0 +1,24 @@ +package com.yomahub.flowtest.concurrent; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +/** + * desc : + * name : SpringBootApp + * + * @author : xujia + * date : 2021/3/3 + * @since : 1.8 + */ +@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) +public class SpringBootApp { + /** + * @param args + */ + public static void main(String[] args) { + SpringApplication.run(SpringBootApp.class, args); + } + +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java new file mode 100644 index 000000000..1f7946153 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java @@ -0,0 +1,85 @@ +package com.yomahub.flowtest.concurrent; + + +import com.yomahub.liteflow.entity.flow.Chain; +import com.yomahub.liteflow.entity.flow.Condition; +import com.yomahub.liteflow.entity.flow.ThenCondition; +import com.yomahub.liteflow.entity.flow.WhenCondition; +import com.yomahub.liteflow.flow.FlowBus; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.List; + + +/** + * desc : + * name : TestParseFlow + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestParseFlow { + + private Check caseAsync = new Check("async", Arrays.asList( + new AbstractMap.SimpleEntry<>(ThenCondition.class, null), + new AbstractMap.SimpleEntry<>(WhenCondition.class, false), + new AbstractMap.SimpleEntry<>(WhenCondition.class, true), + new AbstractMap.SimpleEntry<>(WhenCondition.class, true) + )); + + private Check caseConcurrent = new Check("async-concurrent1", Arrays.asList( + new AbstractMap.SimpleEntry<>(WhenCondition.class, true) + )); + + @Test + public void parseWhen() throws Exception { + assertTrue(caseAsync, FlowBus.getChain(caseAsync.getChainCode())); + + assertTrue(caseConcurrent, FlowBus.getChain(caseConcurrent.getChainCode())); + } + + private void assertTrue(Check check, Chain chain) { + Assert.assertNotNull(chain); + + Assert.assertTrue(null != chain.getConditionList() && !chain.getConditionList().isEmpty()); + for (int i = 0; i < chain.getConditionList().size(); i ++) { + + AbstractMap.SimpleEntry, Boolean> expected = check.getAsyncWithWhen().get(i); + Condition actual = chain.getConditionList().get(i); + + Assert.assertEquals(expected.getKey(), actual.getClass()); + if (actual.getClass().equals(WhenCondition.class)) { + Assert.assertEquals(expected.getValue(), ((WhenCondition) actual).isASync()); + } + } + } + + public static class Check { + private String chainCode; + private List, Boolean>> asyncWithWhen; + + public Check(String chainCode, List, Boolean>> asyncWithWhen) { + this.chainCode = chainCode; + this.asyncWithWhen = asyncWithWhen; + } + + public String getChainCode() { + return chainCode; + } + + public List, Boolean>> getAsyncWithWhen() { + return asyncWithWhen; + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java new file mode 100644 index 000000000..7ec971022 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java @@ -0,0 +1,81 @@ +package com.yomahub.flowtest.concurrent; + +import com.yomahub.liteflow.core.FlowExecutor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseAssertRandom; +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseInit; + +/** + * desc : + * name : TestRunFlow + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestRunFlow { + + @Resource + private FlowExecutor flowExecutor; + + private String init(List steps) { + + String requestId = UUID.randomUUID().toString(); + + caseInit(requestId, steps.stream().map(ConcurrentCase.Routers::new).collect(Collectors.toList())); + + return requestId; + } + + @Test + public void mixedRunTest() throws Exception { + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p5", "p6", "p7", "p8")); + + flowExecutor.execute("async", requestId); + + caseAssertRandom(requestId); + } + + @Test + public void parallelTest() throws InterruptedException { + String requestId1 = init(Arrays.asList("c1", "c2", "c3", "c4", "c5")); + String requestId2 = init(Arrays.asList("c6", "c7", "c8", "c9", "c10")); + + List ts = Arrays.asList( + newExecutor("async-concurrent1", requestId1), + newExecutor("async-concurrent2", requestId2) + ); + ts.forEach(Thread::start); + + for (Thread t : ts) { + t.join(); + } + + caseAssertRandom(requestId1); + caseAssertRandom(requestId2); + } + + private Thread newExecutor(String chain, String requestId) { + return new Thread(() -> { + try { + flowExecutor.execute(chain, requestId); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java new file mode 100644 index 000000000..e92c245fc --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C10 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c10") +public class C10Component extends NodeComponent { + + private static final String name = "c10"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java new file mode 100644 index 000000000..17d3d95f7 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : c1 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c1") +public class C1Component extends NodeComponent { + + private static final String name = "c1"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java new file mode 100644 index 000000000..82564577d --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C2 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c2") +public class C2Component extends NodeComponent { + + private static final String name = "c2"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java new file mode 100644 index 000000000..0be98bf76 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C3Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c3") +public class C3Component extends NodeComponent { + + private static final String name = "c3"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java new file mode 100644 index 000000000..d573fc658 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C4 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c4") +public class C4Component extends NodeComponent { + + private static final String name = "c4"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java new file mode 100644 index 000000000..29c0a7617 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C5 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c5") +public class C5Component extends NodeComponent { + + private static final String name = "c5"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java new file mode 100644 index 000000000..108190689 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C6 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c6") +public class C6Component extends NodeComponent { + + private static final String name = "c6"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java new file mode 100644 index 000000000..17d9e6336 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : c7 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c7") +public class C7Component extends NodeComponent { + + private static final String name = "c7"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java new file mode 100644 index 000000000..eef38003a --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C8 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c8") +public class C8Component extends NodeComponent { + + private static final String name = "c8"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java new file mode 100644 index 000000000..7ee36f7d7 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java @@ -0,0 +1,26 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : C9 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("c9") +public class C9Component extends NodeComponent { + + private static final String name = "c9"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java new file mode 100644 index 000000000..eae5f65d1 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P3Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p3") +public class P3Component extends NodeComponent { + + private static final String name = "p3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java new file mode 100644 index 000000000..cf7bb188f --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P4Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p4") +public class P4Component extends NodeComponent { + + private static final String name = "p4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java new file mode 100644 index 000000000..820944c86 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P5 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p5") +public class P5Component extends NodeComponent { + + private static final String name = "p5"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java new file mode 100644 index 000000000..be94347e3 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P6 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p6") +public class P6Component extends NodeComponent { + + private static final String name = "p6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java new file mode 100644 index 000000000..0c9edd806 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P7Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p7") +public class P7Component extends NodeComponent { + + private static final String name = "p7"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java new file mode 100644 index 000000000..5bdf3a7cc --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : P8Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("p8") +public class P8Component extends NodeComponent { + + private static final String name = "p8"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java new file mode 100644 index 000000000..01a33d474 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S1Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s1") +public class S1Component extends NodeComponent { + + private static final String name = "s1"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java new file mode 100644 index 000000000..6a2aebd45 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S2Component + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s2") +public class S2Component extends NodeComponent { + + private static final String name = "s2"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java new file mode 100644 index 000000000..df52250ae --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S3 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s3") +public class S3Component extends NodeComponent { + + private static final String name = "s3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java new file mode 100644 index 000000000..a0cb7306a --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S4 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s4") +public class S4Component extends NodeComponent { + + private static final String name = "s4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java new file mode 100644 index 000000000..4e491d33b --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S5 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s5") +public class S5Component extends NodeComponent { + + private static final String name = "s5"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java new file mode 100644 index 000000000..d7cd20947 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java @@ -0,0 +1,25 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * desc : + * name : S6 + * + * @author : xujia + * date : 2021/3/25 + * @since : 1.8 + */ +@Component("s6") +public class S6Component extends NodeComponent { + + private static final String name = "s6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/resources/application-test.yml b/liteflow-test-springboot/src/test/resources/application-test.yml new file mode 100644 index 000000000..a98fcc5ea --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/application-test.yml @@ -0,0 +1,18 @@ +logging: + level: + root: debug + io: + lettuce: info + pattern: + console: "%d{yyyy-MM-dd} %d{hhh:mm:ss},%red(%d{SSS}) %green(%-5level) [%thread] %cyan(%logger{36}) : %msg%n" + +server: + port: 8086 + +liteflow: + rule-source: "config/flow-test.xml" + +threadPool: + parallel: + worker: 3 + queue: 512 diff --git a/liteflow-test-springboot/src/test/resources/config/flow-test.xml b/liteflow-test-springboot/src/test/resources/config/flow-test.xml new file mode 100644 index 000000000..13245b27d --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/config/flow-test.xml @@ -0,0 +1,20 @@ + + + + + + + + + + + + + + + + + + + +