diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 8774fa6c5..2a88da0b5 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -31,6 +31,8 @@ import com.yomahub.liteflow.parser.XmlFlowParser; import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser; import com.yomahub.liteflow.property.LiteflowConfig; +import java.util.concurrent.ExecutorService; + /** * 流程规则主要执行器类 * @author Bryan.Zhang @@ -180,4 +182,5 @@ public class FlowExecutor { public void setLiteflowConfig(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; } + } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 58a0cda91..93d9adb20 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -10,6 +10,7 @@ package com.yomahub.liteflow.entity.data; import java.util.concurrent.atomic.AtomicInteger; import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; @@ -29,12 +30,11 @@ public class DataBus { static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - int slotSize = 1024; - if (ObjectUtil.isNotNull(liteflowConfig)){ - if (ObjectUtil.isNotNull(liteflowConfig.getSlotSize())){ - slotSize = liteflowConfig.getSlotSize(); - } + + if (ObjectUtil.isNull(liteflowConfig)){ + throw new ConfigErrorException("config error, please check liteflow config property"); } + int slotSize = liteflowConfig.getSlotSize(); slots = new Slot[slotSize]; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 6fd7aa363..74d2a820e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -8,38 +8,51 @@ */ package com.yomahub.liteflow.entity.flow; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; - import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.enums.ExecuteTypeEnum; import com.yomahub.liteflow.exception.FlowSystemException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; +import org.apache.commons.collections4.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * chain对象,实现可执行器 * @author Bryan.Zhang */ public class Chain implements Executable { - + + private static final Logger LOG = LoggerFactory.getLogger(Chain.class); + private String chainName; private List conditionList; - private static int whenMaxWaitSeconds; + private static ExecutorService parallelExecutor; + + private static final LiteflowConfig liteflowConfig; static { - LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - if (ObjectUtil.isNotNull(liteflowConfig)) { - whenMaxWaitSeconds = liteflowConfig.getWhenMaxWaitSeconds(); - } else { - whenMaxWaitSeconds = 15; + //这里liteflowConfig不可能为null + //如果在springboot环境,由于自动装配,所以不可能为null + //在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了 + liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + + //这里为了严谨,还是判断了下 + if (ObjectUtil.isNull(liteflowConfig)){ + throw new ConfigErrorException("config error, please check liteflow config property"); + } + + parallelExecutor = SpringAware.getBean(ExecutorService.class); + if (ObjectUtil.isNull(parallelExecutor)){ + parallelExecutor = ExecutorHelper.buildExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenQueueLimit(), "liteflow-when-thread", false); } } @@ -85,11 +98,8 @@ public class Chain implements Executable { } } } else if (condition instanceof WhenCondition) { - final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); - for (Executable executableItem : condition.getNodeList()) { - new WhenConditionThread(executableItem, slotIndex, slot.getRequestId(), latch).start(); - } - latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS); + + executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId()); } } } @@ -103,4 +113,58 @@ public class Chain implements Executable { public String getExecuteName() { return chainName; } + + + // 使用线程池执行when并发流程 + private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { + final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); + final List> futures = new ArrayList<>(condition.getNodeList().size()); + + + for (int i = 0; i < condition.getNodeList().size(); i++) { + futures.add(parallelExecutor.submit( + new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch) + )); + } + + boolean interrupted = false; + try { + if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) { + for (Future f : futures) { + f.cancel(true); + } + interrupted = true; + LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId); + } + } catch (InterruptedException e) { + interrupted = true; + } + + /** + * 当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + */ + if (!condition.isErrorResume()) { + if (interrupted) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute interrupted. errorResume [false].", requestId)); + } + + for (Future f : futures) { + try { + if (!f.get()) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } catch (InterruptedException | ExecutionException e) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } + + } else if (interrupted) { + // 这里由于配置了errorResume,所以只打印warn日志 + LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); + } + + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java similarity index 52% rename from liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java rename to liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java index 43574f732..d956e29d3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.entity.flow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -9,9 +10,9 @@ import java.util.concurrent.CountDownLatch; * 并行器线程 * @author Bryan.Zhang */ -public class WhenConditionThread extends Thread { +public class ParallelCallable implements Callable { - private static final Logger LOG = LoggerFactory.getLogger(WhenConditionThread.class); + private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class); private Executable executableItem; @@ -21,7 +22,7 @@ public class WhenConditionThread extends Thread { private CountDownLatch latch; - public WhenConditionThread(Executable executableItem,Integer slotIndex,String requestId,CountDownLatch latch){ + public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { this.executableItem = executableItem; this.slotIndex = slotIndex; this.requestId = requestId; @@ -29,11 +30,15 @@ public class WhenConditionThread extends Thread { } @Override - public void run() { - try{ + public Boolean call() throws Exception { + try { executableItem.execute(slotIndex); - } catch (Exception e) { - LOG.error("item [{}] execute cause error",executableItem.getExecuteName(),e); + + return true; + }catch(Exception e){ + LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName()); + + return false; } finally { latch.countDown(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java index fbcd9715e..13a202537 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java @@ -14,9 +14,20 @@ import java.util.List; * @author Bryan.Zhang */ public class WhenCondition extends Condition{ + // 增加errorResume属性,以区分当when调用链调用失败时是否继续往下执行 + private boolean errorResume; public WhenCondition(List nodeList) { super(nodeList); + errorResume = true; } + public WhenCondition(List nodeList, boolean errorResume) { + super(nodeList); + this.errorResume = errorResume; + } + + public boolean isErrorResume() { + return errorResume; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java new file mode 100644 index 000000000..ebe3f8662 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.exception; + + +public class WhenExecuteException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** 异常信息 */ + private String message; + + public WhenExecuteException(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java index b1333577b..d2109aaa1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java @@ -38,14 +38,6 @@ public class MonitorBus { private LiteflowConfig liteflowConfig; - private boolean enableLog = false; - - private int queueLimit = 200; - - private long delay = 300000; - - private long preiod = 300000; - private final Logger LOG = LoggerFactory.getLogger(this.getClass()); private final ConcurrentHashMap> statisticsMap = new ConcurrentHashMap<>(); @@ -53,25 +45,9 @@ public class MonitorBus { public MonitorBus(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; - if (ObjectUtil.isNotNull(liteflowConfig.getEnableLog())){ - this.enableLog = liteflowConfig.getEnableLog(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getQueueLimit())){ - queueLimit = liteflowConfig.getQueueLimit(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getDelay())){ - delay = liteflowConfig.getDelay(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getPeriod())){ - preiod = liteflowConfig.getPeriod(); - } - - if(enableLog){ + if(liteflowConfig.getEnableLog()){ Timer timer = new Timer(); - timer.schedule(new MonitorTimeTask(this), delay, preiod); + timer.schedule(new MonitorTimeTask(this), liteflowConfig.getDelay(), liteflowConfig.getPeriod()); } } @@ -79,7 +55,7 @@ public class MonitorBus { if(statisticsMap.containsKey(statistics.getComponentClazzName())){ statisticsMap.get(statistics.getComponentClazzName()).add(statistics); }else{ - BoundedPriorityQueue queue = new BoundedPriorityQueue<>(queueLimit); + BoundedPriorityQueue queue = new BoundedPriorityQueue<>(liteflowConfig.getQueueLimit()); queue.offer(statistics); statisticsMap.put(statistics.getComponentClazzName(), queue); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index 66844eb12..9ef59c8fa 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -7,26 +7,21 @@ import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.entity.flow.*; +import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; +import com.yomahub.liteflow.exception.ParseException; +import com.yomahub.liteflow.util.SpringAware; +import org.apache.commons.lang3.StringUtils; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import cn.hutool.core.util.StrUtil; - import com.yomahub.liteflow.core.NodeComponent; -import com.yomahub.liteflow.entity.flow.Chain; -import com.yomahub.liteflow.entity.flow.Condition; -import com.yomahub.liteflow.entity.flow.Executable; -import com.yomahub.liteflow.entity.flow.Node; -import com.yomahub.liteflow.entity.flow.ThenCondition; -import com.yomahub.liteflow.entity.flow.WhenCondition; -import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; -import com.yomahub.liteflow.exception.ParseException; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.spring.ComponentScaner; -import com.yomahub.liteflow.util.SpringAware; /** * xml形式的解析器 @@ -144,7 +139,12 @@ public abstract class XmlFlowParser { if (condE.getName().equals("then")) { conditionList.add(new ThenCondition(chainNodeList)); } else if (condE.getName().equals("when")) { - conditionList.add(new WhenCondition(chainNodeList)); + Attribute errorResume = condE.attribute("errorResume"); + if (errorResume != null) { + conditionList.add(new WhenCondition(chainNodeList, errorResume.getValue().equals(Boolean.TRUE.toString()))); + } else { + conditionList.add(new WhenCondition(chainNodeList)); + } } } FlowBus.addChain(chainName, new Chain(chainName,conditionList)); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 513704519..736d2678d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -7,8 +7,13 @@ */ package com.yomahub.liteflow.property; +import cn.hutool.core.util.ObjectUtil; + /** * liteflow的配置实体类 + * 这个类中的属性为什么不用基本类型,而用包装类型呢 + * 是因为这个类是springboot和spring的最终参数获取器,考虑到spring的场景,有些参数不是必须配置。基本类型就会出现默认值的情况。 + * 所以为了要有null值出现,这里采用包装类型 * @author Bryan.Zhang */ public class LiteflowConfig { @@ -34,6 +39,12 @@ public class LiteflowConfig { //每隔多少秒打印 private Long period; + //异步线程池最大线程数 + private Integer whenMaxWorkers; + + //异步线程池最大队列数量 + private Integer whenQueueLimit; + public String getRuleSource() { return ruleSource; } @@ -43,7 +54,11 @@ public class LiteflowConfig { } public Integer getSlotSize() { - return slotSize; + if (ObjectUtil.isNull(slotSize)){ + return 1024; + }else{ + return slotSize; + } } public void setSlotSize(Integer slotSize) { @@ -51,7 +66,11 @@ public class LiteflowConfig { } public Integer getWhenMaxWaitSeconds() { - return whenMaxWaitSeconds; + if (ObjectUtil.isNull(whenMaxWaitSeconds)){ + return 15; + }else{ + return whenMaxWaitSeconds; + } } public void setWhenMaxWaitSeconds(Integer whenMaxWaitSeconds) { @@ -59,7 +78,11 @@ public class LiteflowConfig { } public Integer getQueueLimit() { - return queueLimit; + if (ObjectUtil.isNull(queueLimit)){ + return 200; + }else{ + return queueLimit; + } } public void setQueueLimit(Integer queueLimit) { @@ -67,7 +90,11 @@ public class LiteflowConfig { } public Long getDelay() { - return delay; + if (ObjectUtil.isNull(delay)){ + return 300000L; + }else{ + return delay; + } } public void setDelay(Long delay) { @@ -75,7 +102,11 @@ public class LiteflowConfig { } public Long getPeriod() { - return period; + if (ObjectUtil.isNull(period)){ + return 300000L; + }else{ + return period; + } } public void setPeriod(Long period) { @@ -83,10 +114,38 @@ public class LiteflowConfig { } public Boolean getEnableLog() { - return enableLog; + if (ObjectUtil.isNull(enableLog)){ + return false; + }else{ + return enableLog; + } } public void setEnableLog(Boolean enableLog) { this.enableLog = enableLog; } + + public Integer getWhenMaxWorkers() { + if (ObjectUtil.isNull(whenMaxWorkers)){ + return Runtime.getRuntime().availableProcessors() * 2; + }else{ + return whenMaxWorkers; + } + } + + public void setWhenMaxWorkers(Integer whenMaxWorkers) { + this.whenMaxWorkers = whenMaxWorkers; + } + + public Integer getWhenQueueLimit() { + if (ObjectUtil.isNull(whenQueueLimit)){ + return 512; + }else{ + return whenQueueLimit; + } + } + + public void setWhenQueueLimit(Integer whenQueueLimit) { + this.whenQueueLimit = whenQueueLimit; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java new file mode 100644 index 000000000..e093869a7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java @@ -0,0 +1,98 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.util; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * 线程池工具类 + * @author Bryan.Zhang + */ +public class ExecutorHelper { + + private ExecutorHelper() { + } + + /** + * 使用默认的等待时间1分钟,来关闭目标线程组。 + *

+ * + * @param pool 需要关闭的线程组. + */ + public static void shutdownAwaitTermination(ExecutorService pool) { + shutdownAwaitTermination(pool, 60L); + } + + /** + * 关闭ExecutorService的线程管理者 + *

+ * + * @param pool 需要关闭的管理者 + * @param timeout 等待时间 + */ + public static void shutdownAwaitTermination(ExecutorService pool, + long timeout) { + pool.shutdown(); + try { + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + pool.shutdownNow(); + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate."); + } + } + } catch (InterruptedException ie) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * 创建的线程都是非后台线程. + * + * @param name 名称. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name) { + return buildExecutorFactory(name, false); + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * + * @param name 名称. + * @param daemon 是否为后台线程. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) { + return new ThreadFactory() { + + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName(name + "-" + number.getAndIncrement()); + newThread.setDaemon(daemon); + return newThread; + } + + }; + } + + public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) { + return new ThreadPoolExecutor(worker, worker, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queue), + buildExecutorFactory(namePrefix, daemon), + new ThreadPoolExecutor.AbortPolicy() + ); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java new file mode 100644 index 000000000..5076944c0 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java @@ -0,0 +1,26 @@ +package com.yomahub.liteflow.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; + +/** + * 关闭shutdown类 + * 执行清理工作 + * @author Bryan.Zhang + */ +public class Shutdown { + + private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class); + + @PreDestroy + public void destroy() throws Exception { + ExecutorService executorService = SpringAware.getBean("whenExecutors"); + + LOG.info("Start closing the liteflow-when-calls..."); + ExecutorHelper.shutdownAwaitTermination(executorService); + LOG.info("Succeed closing the liteflow-when-calls ok..."); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java index 73a94d17b..65dca6221 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java @@ -28,11 +28,21 @@ public class SpringAware implements ApplicationContextAware { } public static T getBean(String name) { - return (T) applicationContext.getBean(name); + try{ + T t = (T) applicationContext.getBean(name); + return t; + }catch (Exception e){ + return null; + } } public static T getBean(Class clazz) { - return applicationContext.getBean(clazz); + try{ + T t = applicationContext.getBean(clazz); + return t; + }catch (Exception e){ + return null; + } } public static T registerBean(Class c) { diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java new file mode 100644 index 000000000..7c81f3cb2 --- /dev/null +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java @@ -0,0 +1,34 @@ +package com.yomahub.liteflow.springboot; + +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.util.ExecutorHelper; +import com.yomahub.liteflow.util.Shutdown; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; + +/** + * 线程池装配类 + * 这个装配前置条件是需要LiteflowConfig,LiteflowPropertyAutoConfiguration以及SpringAware + * @author justin.xu + */ +@Configuration +@ConditionalOnBean(LiteflowConfig.class) +@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class}) +public class LiteflowExecutorAutoConfiguration { + + @Bean("whenExecutors") + public ExecutorService executorService(LiteflowConfig liteflowConfig) { + Integer useWorker = liteflowConfig.getWhenMaxWorkers(); + Integer useQueue = liteflowConfig.getWhenQueueLimit(); + return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-thead", false); + } + + @Bean + public Shutdown shutdown() { + return new Shutdown(); + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java index bc1c72eb3..8662dc075 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java @@ -1,17 +1,25 @@ package com.yomahub.liteflow.springboot; import cn.hutool.core.util.StrUtil; - +import com.google.common.collect.Lists; import com.yomahub.liteflow.core.FlowExecutor; +import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.monitor.MonitorBus; import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.spring.ComponentScaner; import com.yomahub.liteflow.util.SpringAware; - +import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.PropertySource; + +import javax.swing.*; +import java.util.List; /** * 主要的业务装配器 @@ -21,7 +29,7 @@ import org.springframework.context.annotation.Import; */ @Configuration @ConditionalOnBean(LiteflowConfig.class) -@AutoConfigureAfter(LiteflowPropertyAutoConfiguration.class) +@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class}) @Import(SpringAware.class) public class LiteflowMainAutoConfiguration { diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index 0a6ddbc09..8f538eb48 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -18,6 +18,12 @@ public class LiteflowProperty { //异步线程最大等待描述 private int whenMaxWaitSeconds; + //异步线程池最大线程数 + private int whenMaxWorkers; + + //异步线程池最大队列数量 + private int whenQueueLimit; + public String getRuleSource() { return ruleSource; } @@ -41,4 +47,20 @@ public class LiteflowProperty { public void setWhenMaxWaitSeconds(int whenMaxWaitSeconds) { this.whenMaxWaitSeconds = whenMaxWaitSeconds; } + + public int getWhenMaxWorkers() { + return whenMaxWorkers; + } + + public void setWhenMaxWorkers(int whenMaxWorkers) { + this.whenMaxWorkers = whenMaxWorkers; + } + + public int getWhenQueueLimit() { + return whenQueueLimit; + } + + public void setWhenQueueLimit(int whenQueueLimit) { + this.whenQueueLimit = whenQueueLimit; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java index e43d14842..6d773f3e3 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java @@ -31,6 +31,8 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit()); liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); + liteflowConfig.setWhenMaxWorkers(property.getWhenMaxWorkers()); + liteflowConfig.setWhenQueueLimit(property.getWhenQueueLimit()); return liteflowConfig; } } diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index ef7873807..60ac4ce71 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -1,7 +1,9 @@ liteflow.rule-source=config/flow.xml liteflow.slot-size=1024 liteflow.when-max-wait-second=15 +liteflow.when-max-workers=4 +liteflow.when-queue-limit=512 liteflow.monitor.enable-log=false liteflow.monitor.queue-limit=200 liteflow.monitor.delay=300000 -liteflow.monitor.period=300000 \ No newline at end of file +liteflow.monitor.period=300000 diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories index 23669427e..85b579284 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -1,6 +1,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.yomahub.liteflow.springboot.LiteflowComponentScannerAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowPropertyAutoConfiguration,\ + com.yomahub.liteflow.springboot.LiteflowExecutorAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowMainAutoConfiguration diff --git a/liteflow-test-spring/src/main/resources/applicationContext.xml b/liteflow-test-spring/src/main/resources/applicationContext.xml index cb763da99..6c9fb5111 100644 --- a/liteflow-test-spring/src/main/resources/applicationContext.xml +++ b/liteflow-test-spring/src/main/resources/applicationContext.xml @@ -21,6 +21,8 @@ + + @@ -31,5 +33,6 @@ + diff --git a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java index ba6286bf4..1e32a2726 100644 --- a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java +++ b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java @@ -17,7 +17,7 @@ public class TestFlow implements CommandLineRunner { @Override public void run(String... args) throws Exception { - Slot slot = flowExecutor.execute("chain2", "it's a request"); + Slot slot = flowExecutor.execute("chain1", "it's a request"); System.out.println(slot); } } diff --git a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java index 62b9e2af3..8e87c67ac 100644 --- a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java +++ b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java @@ -17,13 +17,9 @@ public class EComponent extends NodeComponent { public void process() { try { Thread.sleep(120L); - System.out.println("E:" + this.getSlot().getOutput("a")); - this.getSlot().setOutput(this.getNodeId(), "E component output"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Eomponent executed!"); - } - } diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java new file mode 100644 index 000000000..e50b26fe9 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java @@ -0,0 +1,130 @@ +package com.yomahub.flowtest.concurrent; + +import org.junit.Assert; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 流程的顺序执行、并发执行的CASE构造器 + * @author justin.xu + */ +public class ConcurrentCase { + public static final Map, List>> CASES = new ConcurrentHashMap<>(); + + /** + * 初始化一个测试用例的预期 + * @param request + * @param expected + */ + public static void caseInit(String request, List expected) { + CASES.put(request, new AbstractMap.SimpleEntry, List>(expected, new CopyOnWriteArrayList<>())); + } + + /** + * 添加这个测试用例的实际 + * @param request + * @param actual + */ + public static void caseAdd(String request, Routers actual) { + CASES.computeIfPresent(request, (k, v) -> { + v.getValue().add(actual); + return v; + }); + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssert(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey(), ca.getValue()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssertRandom(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey().size(), ca.getValue().size()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + boolean find = false; + for(Routers routers : ca.getKey()) { + if (routers.getValue().equals(actual.getValue())) { + find = true; + } + } + Assert.assertTrue(find); + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + + public static class Routers { + int index; + String value; + + public Routers(String value) { + this.index = -1; + this.value = value; + } + public Routers(int index, String value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return index; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Routers routers = (Routers) o; + return value.equals(routers.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java new file mode 100644 index 000000000..9846e2fb5 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java @@ -0,0 +1,20 @@ +package com.yomahub.flowtest.concurrent; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +/** + * 启动类 + * @author justin.xu + */ +@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) +public class SpringBootApp { + /** + * @param args + */ + public static void main(String[] args) { + SpringApplication.run(SpringBootApp.class, args); + } + +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java new file mode 100644 index 000000000..ff647fbfb --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java @@ -0,0 +1,85 @@ +package com.yomahub.flowtest.concurrent; + + +import com.yomahub.liteflow.entity.flow.Chain; +import com.yomahub.liteflow.entity.flow.Condition; +import com.yomahub.liteflow.entity.flow.ThenCondition; +import com.yomahub.liteflow.entity.flow.WhenCondition; +import com.yomahub.liteflow.flow.FlowBus; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + + +/** + * 测试流程的解析 + * @author justin.xu + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestParseFlow { + + private Check caseErrorResume = new Check("test-errorResume", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) + )); + + private Check caseErrorBreak = new Check("test-errorBreak", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, false), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) + )); + + @Test + public void parseWhen() throws Exception { + assertTrue(caseErrorResume, FlowBus.getChain(caseErrorResume.getChainCode())); + + assertTrue(caseErrorBreak, FlowBus.getChain(caseErrorBreak.getChainCode())); + } + + private void assertTrue(Check check, Chain chain) { + Assert.assertNotNull(chain); + + Assert.assertTrue(null != chain.getConditionList() && !chain.getConditionList().isEmpty()); + for (int i = 0; i < chain.getConditionList().size(); i ++) { + + AbstractMap.SimpleEntry, Boolean> expected = check.getClazzWithFlags().get(i); + Condition actual = chain.getConditionList().get(i); + + Assert.assertEquals(expected.getKey(), actual.getClass()); + if (actual.getClass().equals(WhenCondition.class)) { + Assert.assertEquals(expected.getValue(), ((WhenCondition) actual).isErrorResume()); + } + } + } + + public static class Check { + private String chainCode; + private List, Boolean>> clazzWithFlags; + + public Check(String chainCode, List, Boolean>> clazzWithFlags) { + this.chainCode = chainCode; + this.clazzWithFlags = clazzWithFlags; + } + + public String getChainCode() { + return chainCode; + } + + public List, Boolean>> getClazzWithFlags() { + return clazzWithFlags; + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java new file mode 100644 index 000000000..fad9ca62c --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java @@ -0,0 +1,90 @@ +package com.yomahub.flowtest.concurrent; + +import com.yomahub.liteflow.core.FlowExecutor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseAssertRandom; +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseInit; + +/** + * 测试流程的顺序执行、并发执行等 + * @author justin.xu + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestRunFlow { + + @Resource + private FlowExecutor flowExecutor; + + private String init(List steps) { + + String requestId = UUID.randomUUID().toString(); + + caseInit(requestId, steps.stream().map(ConcurrentCase.Routers::new).collect(Collectors.toList())); + + return requestId; + } + + @Test + public void mixedRunByErrorResumeTest() throws Exception { + //由于errorResume,即使p5执行失败抛出异常, p7, p8也将会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6", "p7", "p8")); + + flowExecutor.execute("test-errorResume", requestId); + + caseAssertRandom(requestId); + } + + + @Test + public void mixedRunByErrorBreakTest() throws Exception { + //由于errorBreak,p5执行失败抛出异常, p7, p8将不会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6")); + + flowExecutor.execute("test-errorBreak", requestId); + + caseAssertRandom(requestId); + } + + @Test + public void parallelTest() throws InterruptedException { + //测试2个线程并发时,所执行的序列是正常的,线程安全的(slotIndex在每个执行序列chain中都是不变的) + String requestId1 = init(Arrays.asList("c1", "c2", "c3", "c4", "c5")); + String requestId2 = init(Arrays.asList("c6", "c7", "c8", "c9", "c10")); + + List ts = Arrays.asList( + newExecutor("async-concurrent1", requestId1), + newExecutor("async-concurrent2", requestId2) + ); + ts.forEach(Thread::start); + + for (Thread t : ts) { + t.join(); + } + + caseAssertRandom(requestId1); + caseAssertRandom(requestId2); + } + + private Thread newExecutor(String chain, String requestId) { + return new Thread(() -> { + try { + flowExecutor.execute(chain, requestId); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java new file mode 100644 index 000000000..360878450 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c10") +public class C10Component extends NodeComponent { + + private static final String name = "c10"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java new file mode 100644 index 000000000..641a67629 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c1") +public class C1Component extends NodeComponent { + + private static final String name = "c1"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java new file mode 100644 index 000000000..7aa9ea9ab --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c2") +public class C2Component extends NodeComponent { + + private static final String name = "c2"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java new file mode 100644 index 000000000..104d2e1fd --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c3") +public class C3Component extends NodeComponent { + + private static final String name = "c3"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java new file mode 100644 index 000000000..207b20309 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c4") +public class C4Component extends NodeComponent { + + private static final String name = "c4"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java new file mode 100644 index 000000000..a5877c041 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c5") +public class C5Component extends NodeComponent { + + private static final String name = "c5"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java new file mode 100644 index 000000000..af3bca011 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c6") +public class C6Component extends NodeComponent { + + private static final String name = "c6"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java new file mode 100644 index 000000000..30f47d0d1 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c7") +public class C7Component extends NodeComponent { + + private static final String name = "c7"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java new file mode 100644 index 000000000..ce9bb91e2 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c8") +public class C8Component extends NodeComponent { + + private static final String name = "c8"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java new file mode 100644 index 000000000..f770ac294 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c9") +public class C9Component extends NodeComponent { + + private static final String name = "c9"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java new file mode 100644 index 000000000..eaaa96423 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p3") +public class P3Component extends NodeComponent { + + private static final String name = "p3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java new file mode 100644 index 000000000..4b22bb1ad --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p4") +public class P4Component extends NodeComponent { + + private static final String name = "p4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java new file mode 100644 index 000000000..c4463fc26 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java @@ -0,0 +1,19 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p5") +public class P5Component extends NodeComponent { + + private static final String name = "p5"; + + @Override + public void process() throws Exception { + throw new RuntimeException(String.format("test mock error [%s]", name)); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java new file mode 100644 index 000000000..a17f1ffcd --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p6") +public class P6Component extends NodeComponent { + + private static final String name = "p6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java new file mode 100644 index 000000000..da1bfa819 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p7") +public class P7Component extends NodeComponent { + + private static final String name = "p7"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java new file mode 100644 index 000000000..12f612e95 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p8") +public class P8Component extends NodeComponent { + + private static final String name = "p8"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java new file mode 100644 index 000000000..e78c15f35 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s1") +public class S1Component extends NodeComponent { + + private static final String name = "s1"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java new file mode 100644 index 000000000..5d67adea1 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s2") +public class S2Component extends NodeComponent { + + private static final String name = "s2"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java new file mode 100644 index 000000000..d85ca5822 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s3") +public class S3Component extends NodeComponent { + + private static final String name = "s3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java new file mode 100644 index 000000000..47bec234d --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s4") +public class S4Component extends NodeComponent { + + private static final String name = "s4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java new file mode 100644 index 000000000..4c20855dc --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s5") +public class S5Component extends NodeComponent { + + private static final String name = "s5"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java new file mode 100644 index 000000000..ee2578b6c --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s6") +public class S6Component extends NodeComponent { + + private static final String name = "s6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/resources/application-test.yml b/liteflow-test-springboot/src/test/resources/application-test.yml new file mode 100644 index 000000000..a98fcc5ea --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/application-test.yml @@ -0,0 +1,18 @@ +logging: + level: + root: debug + io: + lettuce: info + pattern: + console: "%d{yyyy-MM-dd} %d{hhh:mm:ss},%red(%d{SSS}) %green(%-5level) [%thread] %cyan(%logger{36}) : %msg%n" + +server: + port: 8086 + +liteflow: + rule-source: "config/flow-test.xml" + +threadPool: + parallel: + worker: 3 + queue: 512 diff --git a/liteflow-test-springboot/src/test/resources/config/flow-test.xml b/liteflow-test-springboot/src/test/resources/config/flow-test.xml new file mode 100644 index 000000000..7bdc30870 --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/config/flow-test.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + +