diff --git a/liteflow-core/pom.xml b/liteflow-core/pom.xml index b7cbd1f22..8dab334f8 100644 --- a/liteflow-core/pom.xml +++ b/liteflow-core/pom.xml @@ -13,18 +13,6 @@ - - org.apache.commons - commons-lang3 - - - org.apache.commons - commons-collections4 - - - commons-io - commons-io - cn.hutool hutool-core @@ -73,10 +61,6 @@ junit junit - - - commons-logging - commons-logging org.apache.curator @@ -96,5 +80,9 @@ org.apache.curator curator-recipes + + com.alibaba + transmittable-thread-local + diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index 990653fa7..96e5bd97a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -7,12 +7,8 @@ */ package com.yomahub.liteflow.core; -import java.text.MessageFormat; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.ReUtil; import cn.hutool.core.util.StrUtil; import com.google.common.collect.Lists; import com.yomahub.liteflow.exception.ConfigErrorException; @@ -25,11 +21,23 @@ import org.slf4j.LoggerFactory; import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.DefaultSlot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.exception.ChainNotFoundException; +import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.exception.FlowExecutorNotInitException; import com.yomahub.liteflow.exception.NoAvailableSlotException; import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.parser.LocalXmlFlowParser; +import com.yomahub.liteflow.parser.XmlFlowParser; +import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser; +import com.yomahub.liteflow.property.LiteflowConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.MessageFormat; +import java.util.List; /** * 流程规则主要执行器类 @@ -39,6 +47,12 @@ public class FlowExecutor { private static final Logger LOG = LoggerFactory.getLogger(FlowExecutor.class); + private static final String ZK_CONFIG_REGEX = "[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*"; + + private static final String LOCAL_CONFIG_REGEX = "^[\\w_\\-\\@\\/]+\\.xml$"; + + private static final String CLASS_CONFIG_REGEX = "^\\w+(\\.\\w+)*$"; + private LiteflowConfig liteflowConfig; private String zkNode; @@ -54,10 +68,13 @@ public class FlowExecutor { // XmlFlowParser parser = null; FlowParser parser = null; for(String path : rulePath){ + XmlFlowParser parser = null; + for (String path : rulePath) { try { - - if(isLocalConfig(path)) { //判断是否是本地的xml文件 + if (isLocalConfig(path)) { parser = new LocalXmlFlowParser(); + } else if (isZKConfig(path)) { + if (StrUtil.isNotBlank(zkNode)) { } else if(isLocalJsonConfig(path)) { parser = new LocalJsonFlowParser(); } else if(isLocalYmlConfig(path)) { @@ -65,12 +82,12 @@ public class FlowExecutor { } else if(isZKConfig(path)){ //判断是否是zk配置 if(StringUtils.isNotBlank(zkNode)) { parser = new ZookeeperXmlFlowParser(zkNode); - }else { + } else { parser = new ZookeeperXmlFlowParser(); } - }else if(isClassConfig(path)) { //判断是否是自定义配置 + } else if (isClassConfig(path)) { Class c = Class.forName(path); - parser = (XmlFlowParser)c.newInstance(); + parser = (XmlFlowParser) c.newInstance(); } parser.parseMain(path); } catch (Exception e) { @@ -82,15 +99,11 @@ public class FlowExecutor { } private boolean isZKConfig(String path) { - Pattern p = Pattern.compile("[\\w\\d][\\w\\d\\.]+\\:(\\d)+(\\,[\\w\\d][\\w\\d\\.]+\\:(\\d)+)*"); - Matcher m = p.matcher(path); - return m.find(); + return ReUtil.isMatch(ZK_CONFIG_REGEX, path); } private boolean isLocalConfig(String path) { - Pattern p = Pattern.compile("^[\\w_\\-\\@\\/]+\\.xml$"); - Matcher m = p.matcher(path); - return m.find(); + return ReUtil.isMatch(LOCAL_CONFIG_REGEX, path); } private boolean isLocalJsonConfig(String path) { @@ -106,81 +119,81 @@ public class FlowExecutor { } private boolean isClassConfig(String path) { - Pattern p = Pattern.compile("^\\w+(\\.\\w+)*$"); - Matcher m = p.matcher(path); - return m.find(); + return ReUtil.isMatch(CLASS_CONFIG_REGEX, path); } public void reloadRule(){ init(); } - public T execute(String chainId,Object param) throws Exception{ - return execute(chainId, param, DefaultSlot.class,null,false); + public void invoke(String chainId, Object param, Class slotClazz, Integer slotIndex) throws Exception { + execute(chainId, param, slotClazz, slotIndex,true); } - public T execute(String chainId,Object param,Class slotClazz) throws Exception{ + public LiteflowResponse execute(String chainId, Object param) throws Exception { + return execute(chainId, param, DefaultSlot.class, null, false); + } + + public LiteflowResponse execute(String chainId, Object param, Class slotClazz) throws Exception { return execute(chainId, param, slotClazz,null,false); } - public void invoke(String chainId,Object param,Class slotClazz,Integer slotIndex) throws Exception{ - execute(chainId, param, slotClazz,slotIndex,true); - } - - public T execute(String chainId,Object param,Class slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{ + public LiteflowResponse execute(String chainId, Object param, Class slotClazz, Integer slotIndex, + boolean isInnerChain) throws Exception { Slot slot = null; - if(FlowBus.needInit()) { + if (FlowBus.needInit()) { init(); } Chain chain = FlowBus.getChain(chainId); - if(chain == null){ + if (ObjectUtil.isNull(chain)) { String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId); throw new ChainNotFoundException(errorMsg); } - if(!isInnerChain && slotIndex == null) { + if (!isInnerChain && ObjectUtil.isNull(slotIndex)) { slotIndex = DataBus.offerSlot(slotClazz); - LOG.info("slot[{}] offered",slotIndex); + LOG.info("slot[{}] offered", slotIndex); } - if(slotIndex == -1){ + if (slotIndex == -1) { throw new NoAvailableSlotException("there is no available slot"); } slot = DataBus.getSlot(slotIndex); - if(slot == null) { + if (slot == null) { throw new NoAvailableSlotException("the slot is not exist"); } - if(StringUtils.isBlank(slot.getRequestId())) { + if (StrUtil.isBlank(slot.getRequestId())) { slot.generateRequestId(); - LOG.info("requestId[{}] has generated",slot.getRequestId()); + LOG.info("requestId[{}] has generated", slot.getRequestId()); } - if(!isInnerChain) { + if (!isInnerChain) { slot.setRequestData(param); slot.setChainName(chainId); - }else { + } else { slot.setChainReqData(chainId, param); } - + LiteflowResponse response = new LiteflowResponse<>(slot); try { // 执行chain chain.execute(slotIndex); } catch (Exception e) { + response.setSuccess(false); + response.setMessage(e.getMessage()); + response.setCause(e.getCause()); LOG.error("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex); - slot.setSuccess(false); - slot.setErrorMsg(e.getMessage()); } finally { if (!isInnerChain) { slot.printStep(); DataBus.releaseSlot(slotIndex); } } - return (T)slot; + return response; } public String getZkNode() { @@ -198,4 +211,5 @@ public class FlowExecutor { public void setLiteflowConfig(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; } + } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/NodeComponent.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/NodeComponent.java index 8942aa0d7..927e4a17a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/NodeComponent.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/NodeComponent.java @@ -7,26 +7,24 @@ */ package com.yomahub.liteflow.core; +import cn.hutool.core.date.StopWatch; import cn.hutool.core.util.ObjectUtil; -import com.yomahub.liteflow.entity.flow.Executable; -import com.yomahub.liteflow.spring.ComponentScaner; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.time.StopWatch; +import cn.hutool.core.util.StrUtil; +import com.alibaba.ttl.TransmittableThreadLocal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; -import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.entity.data.CmpStep; import com.yomahub.liteflow.entity.data.CmpStepType; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.entity.flow.Executable; +import com.yomahub.liteflow.entity.flow.Node; import com.yomahub.liteflow.entity.monitor.CompStatistics; import com.yomahub.liteflow.flow.FlowBus; import com.yomahub.liteflow.monitor.MonitorBus; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.lang.Nullable; - -import javax.annotation.Resource; +import com.yomahub.liteflow.spring.ComponentScaner; /** * 普通组件抽象类 @@ -36,7 +34,7 @@ public abstract class NodeComponent { private static final Logger LOG = LoggerFactory.getLogger(NodeComponent.class); - private InheritableThreadLocal slotIndexTL = new InheritableThreadLocal(); + private TransmittableThreadLocal slotIndexTL = new TransmittableThreadLocal(); @Autowired(required = false) private MonitorBus monitorBus; @@ -44,7 +42,7 @@ public abstract class NodeComponent { private String nodeId; //是否结束整个流程,这个只对串行流程有效,并行流程无效 - private InheritableThreadLocal isEndTL = new InheritableThreadLocal<>(); + private TransmittableThreadLocal isEndTL = new TransmittableThreadLocal<>(); public void execute() throws Exception{ Slot slot = this.getSlot(); @@ -53,39 +51,34 @@ public abstract class NodeComponent { StopWatch stopWatch = new StopWatch(); stopWatch.start(); - //process前置处理 - if(ComponentScaner.cmpAroundAspect != null){ + // process前置处理 + if (ObjectUtil.isNotNull(ComponentScaner.cmpAroundAspect)) { ComponentScaner.cmpAroundAspect.beforeProcess(slot); } - //业务处理逻辑 process(); - //process后置处理 - if(ComponentScaner.cmpAroundAspect != null){ + // process后置处理 + if (ObjectUtil.isNotNull(ComponentScaner.cmpAroundAspect)) { ComponentScaner.cmpAroundAspect.afterProcess(slot); } stopWatch.stop(); - long timeSpent = stopWatch.getTime(); - + // slot.addStep(new CmpStep(nodeId, CmpStepType.END)); - - //性能统计 - CompStatistics statistics = new CompStatistics(); - statistics.setComponentClazzName(this.getClass().getSimpleName()); - statistics.setTimeSpent(timeSpent); - if (ObjectUtil.isNotNull(monitorBus)){ + final long timeSpent = stopWatch.getTotalTimeMillis(); + // 性能统计 + if (ObjectUtil.isNotNull(monitorBus)) { + CompStatistics statistics = new CompStatistics(this.getClass().getSimpleName(), timeSpent); monitorBus.addStatistics(statistics); } - //进行判断是否是条件节点,条件节点最终也会落到node节点或者chain节点上 - if(this instanceof NodeCondComponent){ + if (this instanceof NodeCondComponent) { String condNodeId = slot.getCondResult(this.getClass().getName()); - if(StringUtils.isNotBlank(condNodeId)){ + if (StrUtil.isNotBlank(condNodeId)) { Node thisNode = FlowBus.getNode(nodeId); Executable condExecutor = thisNode.getCondNode(condNodeId); - if(condExecutor != null){ + if (ObjectUtil.isNotNull(condExecutor)) { condExecutor.execute(slotIndexTL.get()); } } @@ -118,9 +111,9 @@ public abstract class NodeComponent { */ public boolean isEnd() { Boolean isEnd = isEndTL.get(); - if(isEnd == null){ + if(ObjectUtil.isNull(isEnd)){ return false; - }else{ + } else { return isEndTL.get(); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java index 52bff7d0b..9d07a6e0a 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/AbsSlot.java @@ -7,42 +7,38 @@ */ package com.yomahub.liteflow.entity.data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Slot的抽象类实现 * @author Bryan.Zhang */ @SuppressWarnings("unchecked") -public abstract class AbsSlot implements Slot{ +public abstract class AbsSlot implements Slot { private static final Logger LOG = LoggerFactory.getLogger(Slot.class); - private final String REQUEST = "request"; + private static final String REQUEST = "request"; - private final String RESPONSE = "response"; + private static final String RESPONSE = "response"; - private final String CHAINNAME = "chain_name"; + private static final String CHAINNAME = "chain_name"; - private final String COND_NODE_PREFIX = "cond_"; + private static final String COND_NODE_PREFIX = "cond_"; - private final String NODE_INPUT_PREFIX = "input_"; + private static final String NODE_INPUT_PREFIX = "input_"; - private final String NODE_OUTPUT_PREFIX = "output_"; + private static final String NODE_OUTPUT_PREFIX = "output_"; - private final String CHAIN_REQ_PREFIX = "chain_req_"; + private static final String CHAIN_REQ_PREFIX = "chain_req_"; - private final String REQUEST_ID = "req_id"; - - private boolean isSuccess = true; - - private String errorMsg; + private static final String REQUEST_ID = "req_id"; private Deque executeSteps = new ArrayDeque(); @@ -142,20 +138,4 @@ public abstract class AbsSlot implements Slot{ public Deque getExecuteSteps() { return executeSteps; } - - public boolean isSuccess() { - return isSuccess; - } - - public void setSuccess(boolean success) { - isSuccess = success; - } - - public String getErrorMsg() { - return errorMsg; - } - - public void setErrorMsg(String errorMsg) { - this.errorMsg = errorMsg; - } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/CmpStep.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/CmpStep.java index 7eaa7c14c..3278ceabb 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/CmpStep.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/CmpStep.java @@ -9,6 +9,8 @@ package com.yomahub.liteflow.entity.data; import java.text.MessageFormat; +import cn.hutool.core.util.ObjectUtil; + /** * 组件步骤对象 * @author Bryan.Zhang @@ -52,7 +54,7 @@ public class CmpStep { @Override public boolean equals(Object obj) { - if (obj == null) { + if (ObjectUtil.isNull(obj)) { return false; }else { if(getClass() != obj.getClass()) { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java index 0cd59e561..93d9adb20 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/DataBus.java @@ -10,6 +10,7 @@ package com.yomahub.liteflow.entity.data; import java.util.concurrent.atomic.AtomicInteger; import cn.hutool.core.util.ObjectUtil; +import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.util.SpringAware; import org.slf4j.Logger; @@ -29,19 +30,18 @@ public class DataBus { static { LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - int slotSize = 1024; - if (ObjectUtil.isNotNull(liteflowConfig)){ - if (ObjectUtil.isNotNull(liteflowConfig.getSlotSize())){ - slotSize = liteflowConfig.getSlotSize(); - } + + if (ObjectUtil.isNull(liteflowConfig)){ + throw new ConfigErrorException("config error, please check liteflow config property"); } + int slotSize = liteflowConfig.getSlotSize(); slots = new Slot[slotSize]; } public synchronized static int offerSlot(Class slotClazz){ try{ for(int i = 0; i < slots.length; i++){ - if(slots[i] == null){ + if(ObjectUtil.isNull(slots[i])){ slots[i] = slotClazz.newInstance(); OCCUPY_COUNT.incrementAndGet(); return i; @@ -60,7 +60,7 @@ public class DataBus { } public static void releaseSlot(int slotIndex){ - if(slots[slotIndex] != null){ + if(ObjectUtil.isNotNull(slots[slotIndex])){ LOG.info("[{}]:slot[{}] released",slots[slotIndex].getRequestId(),slotIndex); slots[slotIndex] = null; OCCUPY_COUNT.decrementAndGet(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/LiteflowResponse.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/LiteflowResponse.java new file mode 100644 index 000000000..4d46a0d40 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/LiteflowResponse.java @@ -0,0 +1,58 @@ +package com.yomahub.liteflow.entity.data; + +import java.io.Serializable; + +/** + * 执行结果封装类 + * @author zend.wang + */ +public class LiteflowResponse implements Serializable { + + private static final long serialVersionUID = -2792556188993845048L; + + private boolean success; + + private String message; + + private Throwable cause; + + private T data; + + public LiteflowResponse(T data) { + this.success = true; + this.message = ""; + this.data = data; + } + + public boolean isSuccess() { + return success; + } + + public void setSuccess(final boolean success) { + this.success = success; + } + + public String getMessage() { + return message; + } + + public void setMessage(final String message) { + this.message = message; + } + + public Throwable getCause() { + return cause; + } + + public void setCause(final Throwable cause) { + this.cause = cause; + } + + public T getData() { + return data; + } + + public void setData(final T data) { + this.data = data; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java index 4ba30a028..ab917ecf7 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/data/Slot.java @@ -51,12 +51,4 @@ public interface Slot { public void setChainName(String chainName); public String getChainName(); - - public boolean isSuccess(); - - public void setSuccess(boolean success); - - public String getErrorMsg(); - - public void setErrorMsg(String errorMsg); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java index 320b10ee1..75b4e642f 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java @@ -8,20 +8,23 @@ */ package com.yomahub.liteflow.entity.flow; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.ObjectUtil; import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.entity.data.Slot; import com.yomahub.liteflow.enums.ExecuteTypeEnum; +import com.yomahub.liteflow.exception.ConfigErrorException; import com.yomahub.liteflow.exception.FlowSystemException; +import com.yomahub.liteflow.exception.WhenExecuteException; import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.util.ExecutorHelper; import com.yomahub.liteflow.util.SpringAware; -import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * chain对象,实现可执行器 @@ -35,14 +38,24 @@ public class Chain implements Executable { private List conditionList; - private static int whenMaxWaitSeconds; + private static ExecutorService parallelExecutor; + + private static final LiteflowConfig liteflowConfig; static { - LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class); - if (ObjectUtil.isNotNull(liteflowConfig)) { - whenMaxWaitSeconds = liteflowConfig.getWhenMaxWaitSeconds(); - } else { - whenMaxWaitSeconds = 15; + //这里liteflowConfig不可能为null + //如果在springboot环境,由于自动装配,所以不可能为null + //在spring环境,如果xml没配置,在FlowExecutor的init时候就已经报错了 + liteflowConfig = SpringAware.getBean(LiteflowConfig.class); + + //这里为了严谨,还是判断了下 + if (ObjectUtil.isNull(liteflowConfig)){ + throw new ConfigErrorException("config error, please check liteflow config property"); + } + + parallelExecutor = SpringAware.getBean(ExecutorService.class); + if (ObjectUtil.isNull(parallelExecutor)){ + parallelExecutor = ExecutorHelper.buildExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenQueueLimit(), "liteflow-when-thread", false); } } @@ -70,7 +83,7 @@ public class Chain implements Executable { //执行chain的主方法 @Override public void execute(Integer slotIndex) throws Exception { - if (CollectionUtils.isEmpty(conditionList)) { + if (CollUtil.isEmpty(conditionList)) { throw new FlowSystemException("no conditionList in this chain[" + chainName + "]"); } @@ -88,11 +101,8 @@ public class Chain implements Executable { } } } else if (condition instanceof WhenCondition) { - final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); - for (Executable executableItem : condition.getNodeList()) { - new WhenConditionThread(executableItem, slotIndex, slot.getRequestId(), latch).start(); - } - latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS); + + executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId()); } } } @@ -106,4 +116,58 @@ public class Chain implements Executable { public String getExecuteName() { return chainName; } + + + // 使用线程池执行when并发流程 + private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) { + final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size()); + final List> futures = new ArrayList<>(condition.getNodeList().size()); + + + for (int i = 0; i < condition.getNodeList().size(); i++) { + futures.add(parallelExecutor.submit( + new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch) + )); + } + + boolean interrupted = false; + try { + if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) { + for (Future f : futures) { + f.cancel(true); + } + interrupted = true; + LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId); + } + } catch (InterruptedException e) { + interrupted = true; + } + + /** + * 当配置了errorResume = false,出现interrupted或者!f.get()的情况,将抛出WhenExecuteException + */ + if (!condition.isErrorResume()) { + if (interrupted) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute interrupted. errorResume [false].", requestId)); + } + + for (Future f : futures) { + try { + if (!f.get()) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } catch (InterruptedException | ExecutionException e) { + throw new WhenExecuteException(String.format( + "requestId [%s] when execute failed. errorResume [false].", requestId)); + } + } + + } else if (interrupted) { + // 这里由于配置了errorResume,所以只打印warn日志 + LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId); + } + + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java index a53677c8a..705196eb5 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Node.java @@ -11,6 +11,7 @@ import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.entity.data.DataBus; @@ -83,7 +84,7 @@ public class Node implements Executable{ //所有的可执行节点,其实最终都会落到node上来,因为chain中包含的也是node @Override public void execute(Integer slotIndex) throws Exception { - if(instance == null){ + if (ObjectUtil.isNull(instance)) { throw new FlowSystemException("there is no instance for node id " + id); } //每次执行node前,把分配的slot index信息放入threadLocal里 @@ -92,30 +93,30 @@ public class Node implements Executable{ try{ //判断是否可执行,所以isAccess经常作为一个组件进入的实际判断要素,用作检查slot里的参数的完备性 - if(instance.isAccess()){ + if (instance.isAccess()) { //执行业务逻辑的主要入口 instance.execute(); //如果组件覆盖了isEnd方法,或者在在逻辑中主要调用了setEnd(true)的话,流程就会立马结束 - if(instance.isEnd()){ + if (instance.isEnd()) { String errorInfo = StrUtil.format("[{}]:component[{}] lead the chain to end",slot.getRequestId(),instance.getClass().getSimpleName()); throw new ChainEndException(errorInfo); } - }else{ + } else { LOG.info("[{}]:[X]skip component[{}] execution",slot.getRequestId(),instance.getClass().getSimpleName()); } - }catch (Exception e){ + } catch (Exception e) { //如果组件覆盖了isContinueOnError方法,返回为true,那即便出了异常,也会继续流程 - if(instance.isContinueOnError()){ + if (instance.isContinueOnError()) { String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error,but flow is still go on", slot.getRequestId(),id); LOG.error(errorMsg,e); - }else{ + } else { String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error,error:{2}",slot.getRequestId(),id,e.getMessage()); LOG.error(errorMsg,e); throw e; } - }finally { + } finally { //移除threadLocal里的信息 instance.removeSlotIndex(); instance.removeIsEnd(); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java similarity index 53% rename from liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java rename to liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java index bc42440d0..d956e29d3 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenConditionThread.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java @@ -2,6 +2,7 @@ package com.yomahub.liteflow.entity.flow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -9,9 +10,9 @@ import java.util.concurrent.CountDownLatch; * 并行器线程 * @author Bryan.Zhang */ -public class WhenConditionThread extends Thread { +public class ParallelCallable implements Callable { - private static final Logger LOG = LoggerFactory.getLogger(WhenConditionThread.class); + private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class); private Executable executableItem; @@ -21,7 +22,7 @@ public class WhenConditionThread extends Thread { private CountDownLatch latch; - public WhenConditionThread(Executable executableItem,Integer slotIndex,String requestId,CountDownLatch latch){ + public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) { this.executableItem = executableItem; this.slotIndex = slotIndex; this.requestId = requestId; @@ -29,12 +30,16 @@ public class WhenConditionThread extends Thread { } @Override - public void run() { - try{ + public Boolean call() throws Exception { + try { executableItem.execute(slotIndex); + + return true; }catch(Exception e){ - LOG.error("item [{}] execute cause error",executableItem.getExecuteName(),e); - }finally{ + LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName()); + + return false; + } finally { latch.countDown(); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java index fbcd9715e..13a202537 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/WhenCondition.java @@ -14,9 +14,20 @@ import java.util.List; * @author Bryan.Zhang */ public class WhenCondition extends Condition{ + // 增加errorResume属性,以区分当when调用链调用失败时是否继续往下执行 + private boolean errorResume; public WhenCondition(List nodeList) { super(nodeList); + errorResume = true; } + public WhenCondition(List nodeList, boolean errorResume) { + super(nodeList); + this.errorResume = errorResume; + } + + public boolean isErrorResume() { + return errorResume; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java index ad9c10d2a..70a8ec1a4 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/entity/monitor/CompStatistics.java @@ -19,6 +19,13 @@ public class CompStatistics implements Comparable{ private long memorySpent; + private long recordTime; + + public CompStatistics(String componentClazzName, long timeSpent) { + this.componentClazzName = componentClazzName; + this.timeSpent = timeSpent; + this.recordTime = System.currentTimeMillis(); + } public String getComponentClazzName() { return componentClazzName; } @@ -42,9 +49,16 @@ public class CompStatistics implements Comparable{ public void setMemorySpent(long memorySpent) { this.memorySpent = memorySpent; } - + + public long getRecordTime() { + return recordTime; + } + @Override public int compareTo(Object o) { - return -1; + if( o instanceof CompStatistics) { + return this.recordTime >= ((CompStatistics) o).getRecordTime() ? -1 : 1; + } + return 1; } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java new file mode 100644 index 000000000..ebe3f8662 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/exception/WhenExecuteException.java @@ -0,0 +1,21 @@ +package com.yomahub.liteflow.exception; + + +public class WhenExecuteException extends RuntimeException { + private static final long serialVersionUID = 1L; + + /** 异常信息 */ + private String message; + + public WhenExecuteException(String message) { + this.message = message; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java index f0288b737..4ce599809 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java @@ -10,7 +10,7 @@ package com.yomahub.liteflow.flow; import java.util.HashMap; import java.util.Map; -import org.apache.commons.collections4.MapUtils; +import cn.hutool.core.map.MapUtil; import com.yomahub.liteflow.entity.flow.Chain; import com.yomahub.liteflow.entity.flow.Node; @@ -29,7 +29,7 @@ public class FlowBus { } public static Chain getChain(String id) throws Exception { - if (MapUtils.isEmpty(chainMap)) { + if (MapUtil.isEmpty(chainMap)) { throw new Exception("please config the rule first"); } return chainMap.get(id); @@ -44,10 +44,10 @@ public class FlowBus { } public static boolean needInit() { - return MapUtils.isEmpty(chainMap); + return MapUtil.isEmpty(chainMap); } - public static boolean containNode(String nodeId){ + public static boolean containNode(String nodeId) { return nodeMap.containsKey(nodeId); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java index d7b6e4242..d2109aaa1 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorBus.java @@ -12,22 +12,22 @@ import java.math.RoundingMode; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.TimerTask; import java.util.Map.Entry; import java.util.Timer; import java.util.concurrent.ConcurrentHashMap; -import cn.hutool.core.collection.BoundedPriorityQueue; import cn.hutool.core.util.ObjectUtil; -import com.yomahub.liteflow.property.LiteflowConfig; +import cn.hutool.core.collection.BoundedPriorityQueue; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.yomahub.liteflow.entity.data.DataBus; +import com.yomahub.liteflow.property.LiteflowConfig; import com.yomahub.liteflow.entity.monitor.CompStatistics; /** @@ -38,14 +38,6 @@ public class MonitorBus { private LiteflowConfig liteflowConfig; - private boolean enableLog = false; - - private int queueLimit = 200; - - private long delay = 300000; - - private long preiod = 300000; - private final Logger LOG = LoggerFactory.getLogger(this.getClass()); private final ConcurrentHashMap> statisticsMap = new ConcurrentHashMap<>(); @@ -53,25 +45,9 @@ public class MonitorBus { public MonitorBus(LiteflowConfig liteflowConfig) { this.liteflowConfig = liteflowConfig; - if (ObjectUtil.isNotNull(liteflowConfig.getEnableLog())){ - this.enableLog = liteflowConfig.getEnableLog(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getQueueLimit())){ - queueLimit = liteflowConfig.getQueueLimit(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getDelay())){ - delay = liteflowConfig.getDelay(); - } - - if (ObjectUtil.isNotNull(liteflowConfig.getPeriod())){ - preiod = liteflowConfig.getPeriod(); - } - - if(enableLog){ + if(liteflowConfig.getEnableLog()){ Timer timer = new Timer(); - timer.schedule(new MonitorTimeTask(this), delay, preiod); + timer.schedule(new MonitorTimeTask(this), liteflowConfig.getDelay(), liteflowConfig.getPeriod()); } } @@ -79,7 +55,7 @@ public class MonitorBus { if(statisticsMap.containsKey(statistics.getComponentClazzName())){ statisticsMap.get(statistics.getComponentClazzName()).add(statistics); }else{ - BoundedPriorityQueue queue = new BoundedPriorityQueue<>(queueLimit); + BoundedPriorityQueue queue = new BoundedPriorityQueue<>(liteflowConfig.getQueueLimit()); queue.offer(statistics); statisticsMap.put(statistics.getComponentClazzName(), queue); } @@ -88,10 +64,9 @@ public class MonitorBus { public void printStatistics(){ try{ Map compAverageTimeSpent = new HashMap(); - - long totalTimeSpent = 0; - + for(Entry> entry : statisticsMap.entrySet()){ + long totalTimeSpent = 0; for(CompStatistics statistics : entry.getValue()){ totalTimeSpent += statistics.getTimeSpent(); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java index f1b1c849a..3d3109aae 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java @@ -12,7 +12,7 @@ import com.yomahub.liteflow.entity.flow.*; import com.yomahub.liteflow.exception.ExecutableItemNotFoundException; import com.yomahub.liteflow.exception.ParseException; import com.yomahub.liteflow.util.SpringAware; -import org.apache.commons.lang3.StringUtils; +import org.dom4j.Attribute; import org.dom4j.Document; import org.dom4j.DocumentHelper; import org.dom4j.Element; @@ -98,7 +98,7 @@ public abstract class XmlFlowParser extends FlowParser{ for (Iterator it = e.elementIterator(); it.hasNext();) { Element condE = it.next(); condArrayStr = condE.attributeValue("value"); - if (StringUtils.isBlank(condArrayStr)) { + if (StrUtil.isBlank(condArrayStr)) { continue; } chainNodeList = new ArrayList<>(); @@ -137,7 +137,12 @@ public abstract class XmlFlowParser extends FlowParser{ if (condE.getName().equals("then")) { conditionList.add(new ThenCondition(chainNodeList)); } else if (condE.getName().equals("when")) { - conditionList.add(new WhenCondition(chainNodeList)); + Attribute errorResume = condE.attribute("errorResume"); + if (errorResume != null) { + conditionList.add(new WhenCondition(chainNodeList, errorResume.getValue().equals(Boolean.TRUE.toString()))); + } else { + conditionList.add(new WhenCondition(chainNodeList)); + } } } FlowBus.addChain(chainName, new Chain(chainName,conditionList)); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java index c1be7e1f1..a5ac76a7d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/ZookeeperXmlFlowParser.java @@ -2,7 +2,8 @@ package com.yomahub.liteflow.parser; import java.text.MessageFormat; -import org.apache.commons.lang3.StringUtils; +import cn.hutool.core.util.StrUtil; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; @@ -10,6 +11,7 @@ import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.yomahub.liteflow.exception.ParseException; /** @@ -45,7 +47,7 @@ public class ZookeeperXmlFlowParser extends XmlFlowParser{ String content = new String(client.getData().forPath(nodePath)); - if(StringUtils.isBlank(content)) { + if (StrUtil.isBlank(content)) { String error = MessageFormat.format("the node[{0}] value is empty", nodePath); throw new ParseException(error); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 513704519..736d2678d 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -7,8 +7,13 @@ */ package com.yomahub.liteflow.property; +import cn.hutool.core.util.ObjectUtil; + /** * liteflow的配置实体类 + * 这个类中的属性为什么不用基本类型,而用包装类型呢 + * 是因为这个类是springboot和spring的最终参数获取器,考虑到spring的场景,有些参数不是必须配置。基本类型就会出现默认值的情况。 + * 所以为了要有null值出现,这里采用包装类型 * @author Bryan.Zhang */ public class LiteflowConfig { @@ -34,6 +39,12 @@ public class LiteflowConfig { //每隔多少秒打印 private Long period; + //异步线程池最大线程数 + private Integer whenMaxWorkers; + + //异步线程池最大队列数量 + private Integer whenQueueLimit; + public String getRuleSource() { return ruleSource; } @@ -43,7 +54,11 @@ public class LiteflowConfig { } public Integer getSlotSize() { - return slotSize; + if (ObjectUtil.isNull(slotSize)){ + return 1024; + }else{ + return slotSize; + } } public void setSlotSize(Integer slotSize) { @@ -51,7 +66,11 @@ public class LiteflowConfig { } public Integer getWhenMaxWaitSeconds() { - return whenMaxWaitSeconds; + if (ObjectUtil.isNull(whenMaxWaitSeconds)){ + return 15; + }else{ + return whenMaxWaitSeconds; + } } public void setWhenMaxWaitSeconds(Integer whenMaxWaitSeconds) { @@ -59,7 +78,11 @@ public class LiteflowConfig { } public Integer getQueueLimit() { - return queueLimit; + if (ObjectUtil.isNull(queueLimit)){ + return 200; + }else{ + return queueLimit; + } } public void setQueueLimit(Integer queueLimit) { @@ -67,7 +90,11 @@ public class LiteflowConfig { } public Long getDelay() { - return delay; + if (ObjectUtil.isNull(delay)){ + return 300000L; + }else{ + return delay; + } } public void setDelay(Long delay) { @@ -75,7 +102,11 @@ public class LiteflowConfig { } public Long getPeriod() { - return period; + if (ObjectUtil.isNull(period)){ + return 300000L; + }else{ + return period; + } } public void setPeriod(Long period) { @@ -83,10 +114,38 @@ public class LiteflowConfig { } public Boolean getEnableLog() { - return enableLog; + if (ObjectUtil.isNull(enableLog)){ + return false; + }else{ + return enableLog; + } } public void setEnableLog(Boolean enableLog) { this.enableLog = enableLog; } + + public Integer getWhenMaxWorkers() { + if (ObjectUtil.isNull(whenMaxWorkers)){ + return Runtime.getRuntime().availableProcessors() * 2; + }else{ + return whenMaxWorkers; + } + } + + public void setWhenMaxWorkers(Integer whenMaxWorkers) { + this.whenMaxWorkers = whenMaxWorkers; + } + + public Integer getWhenQueueLimit() { + if (ObjectUtil.isNull(whenQueueLimit)){ + return 512; + }else{ + return whenQueueLimit; + } + } + + public void setWhenQueueLimit(Integer whenQueueLimit) { + this.whenQueueLimit = whenQueueLimit; + } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/spring/ComponentScaner.java b/liteflow-core/src/main/java/com/yomahub/liteflow/spring/ComponentScaner.java index 9375dcf99..2eafea230 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/spring/ComponentScaner.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/spring/ComponentScaner.java @@ -10,13 +10,14 @@ package com.yomahub.liteflow.spring; import java.util.HashMap; import java.util.Map; -import com.yomahub.liteflow.aop.ICmpAroundAspect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; + +import com.yomahub.liteflow.aop.ICmpAroundAspect; import com.yomahub.liteflow.core.NodeComponent; import com.yomahub.liteflow.util.LOGOPrinter; @@ -33,7 +34,7 @@ public class ComponentScaner implements BeanPostProcessor, PriorityOrdered { public static ICmpAroundAspect cmpAroundAspect; static { - //打印liteflow的LOGO + // 打印liteflow的LOGO LOGOPrinter.print(); } @@ -46,18 +47,18 @@ public class ComponentScaner implements BeanPostProcessor, PriorityOrdered { @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { Class clazz = bean.getClass(); - //组件的扫描发现,扫到之后缓存到类属性map中 - if(NodeComponent.class.isAssignableFrom(clazz)){ - LOG.info("component[{}] has been found",beanName); - NodeComponent nodeComponent = (NodeComponent)bean; + // 组件的扫描发现,扫到之后缓存到类属性map中 + if (NodeComponent.class.isAssignableFrom(clazz)) { + LOG.info("component[{}] has been found", beanName); + NodeComponent nodeComponent = (NodeComponent) bean; nodeComponent.setNodeId(beanName); nodeComponentMap.put(beanName, nodeComponent); } - //组件Aop的实现类加载 - if(ICmpAroundAspect.class.isAssignableFrom(clazz)){ - LOG.info("component aspect implement[{}] has been found",beanName); - cmpAroundAspect = (ICmpAroundAspect)bean; + // 组件Aop的实现类加载 + if (ICmpAroundAspect.class.isAssignableFrom(clazz)) { + LOG.info("component aspect implement[{}] has been found", beanName); + cmpAroundAspect = (ICmpAroundAspect) bean; } return bean; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java new file mode 100644 index 000000000..e093869a7 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/ExecutorHelper.java @@ -0,0 +1,98 @@ +/** + *

Title: liteflow

+ *

Description: 轻量级的组件式流程框架

+ * @author Bryan.Zhang + * @email weenyc31@163.com + * @Date 2020/4/1 + */ +package com.yomahub.liteflow.util; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + + +/** + * 线程池工具类 + * @author Bryan.Zhang + */ +public class ExecutorHelper { + + private ExecutorHelper() { + } + + /** + * 使用默认的等待时间1分钟,来关闭目标线程组。 + *

+ * + * @param pool 需要关闭的线程组. + */ + public static void shutdownAwaitTermination(ExecutorService pool) { + shutdownAwaitTermination(pool, 60L); + } + + /** + * 关闭ExecutorService的线程管理者 + *

+ * + * @param pool 需要关闭的管理者 + * @param timeout 等待时间 + */ + public static void shutdownAwaitTermination(ExecutorService pool, + long timeout) { + pool.shutdown(); + try { + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + pool.shutdownNow(); + if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate."); + } + } + } catch (InterruptedException ie) { + pool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * 创建的线程都是非后台线程. + * + * @param name 名称. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name) { + return buildExecutorFactory(name, false); + } + + /** + * 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名. + * + * @param name 名称. + * @param daemon 是否为后台线程. + * @return 线程工厂实例. + */ + public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) { + return new ThreadFactory() { + + private final AtomicLong number = new AtomicLong(); + + @Override + public Thread newThread(Runnable r) { + Thread newThread = Executors.defaultThreadFactory().newThread(r); + newThread.setName(name + "-" + number.getAndIncrement()); + newThread.setDaemon(daemon); + return newThread; + } + + }; + } + + public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) { + return new ThreadPoolExecutor(worker, worker, + 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(queue), + buildExecutorFactory(namePrefix, daemon), + new ThreadPoolExecutor.AbortPolicy() + ); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/LimitQueue.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/LimitQueue.java index 13c4a3861..b4a889990 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/LimitQueue.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/LimitQueue.java @@ -9,7 +9,6 @@ package com.yomahub.liteflow.util; import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java new file mode 100644 index 000000000..5076944c0 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/Shutdown.java @@ -0,0 +1,26 @@ +package com.yomahub.liteflow.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutorService; + +/** + * 关闭shutdown类 + * 执行清理工作 + * @author Bryan.Zhang + */ +public class Shutdown { + + private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class); + + @PreDestroy + public void destroy() throws Exception { + ExecutorService executorService = SpringAware.getBean("whenExecutors"); + + LOG.info("Start closing the liteflow-when-calls..."); + ExecutorHelper.shutdownAwaitTermination(executorService); + LOG.info("Succeed closing the liteflow-when-calls ok..."); + } +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java b/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java index a0bedede6..65dca6221 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/util/SpringAware.java @@ -1,7 +1,5 @@ package com.yomahub.liteflow.util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanDefinition; @@ -15,7 +13,7 @@ import org.springframework.context.ApplicationContextAware; * @author Bryan.Zhang */ public class SpringAware implements ApplicationContextAware { - private static final Logger log = LoggerFactory.getLogger(SpringAware.class); + private static ApplicationContext applicationContext = null; public SpringAware() { @@ -30,11 +28,21 @@ public class SpringAware implements ApplicationContextAware { } public static T getBean(String name) { - return (T) applicationContext.getBean(name); + try{ + T t = (T) applicationContext.getBean(name); + return t; + }catch (Exception e){ + return null; + } } public static T getBean(Class clazz) { - return applicationContext.getBean(clazz); + try{ + T t = applicationContext.getBean(clazz); + return t; + }catch (Exception e){ + return null; + } } public static T registerBean(Class c) { @@ -45,12 +53,12 @@ public class SpringAware implements ApplicationContextAware { return getBean(c.getName()); } - public static T registerOrGet(Class clazz){ + public static T registerOrGet(Class clazz) { T t = null; - try{ + try { t = SpringAware.getBean(clazz); - }catch (NoSuchBeanDefinitionException e){ - if(t == null){ + } catch (NoSuchBeanDefinitionException e) { + if (t == null) { t = SpringAware.registerBean(clazz); } } diff --git a/liteflow-core/src/test/java/com/yomahub/liteflow/monitor/BoundedPriorityQueueTest.java b/liteflow-core/src/test/java/com/yomahub/liteflow/monitor/BoundedPriorityQueueTest.java new file mode 100644 index 000000000..a705ba105 --- /dev/null +++ b/liteflow-core/src/test/java/com/yomahub/liteflow/monitor/BoundedPriorityQueueTest.java @@ -0,0 +1,28 @@ +package com.yomahub.liteflow.monitor; + +import java.util.Iterator; +import java.util.PriorityQueue; +import cn.hutool.core.collection.BoundedPriorityQueue; +import cn.hutool.core.util.RandomUtil; +import org.junit.Test; + +import com.yomahub.liteflow.entity.monitor.CompStatistics; + +public class BoundedPriorityQueueTest { + + @Test + public void test() throws InterruptedException { + PriorityQueue queue = new BoundedPriorityQueue<>(6); + for (int i = 0; i < 20 ; i ++) { + long randomTime = RandomUtil.randomLong(20); + Thread.sleep(randomTime); + queue.add(new CompStatistics("comp" + i, randomTime)); + } + + Iterator iterator = queue.iterator(); + while(iterator.hasNext()) { + CompStatistics compStatistics = iterator.next(); + System.out.println(compStatistics.getComponentClazzName() + " " + compStatistics.getTimeSpent()); + } + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java new file mode 100644 index 000000000..7c81f3cb2 --- /dev/null +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorAutoConfiguration.java @@ -0,0 +1,34 @@ +package com.yomahub.liteflow.springboot; + +import com.yomahub.liteflow.property.LiteflowConfig; +import com.yomahub.liteflow.util.ExecutorHelper; +import com.yomahub.liteflow.util.Shutdown; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.ExecutorService; + +/** + * 线程池装配类 + * 这个装配前置条件是需要LiteflowConfig,LiteflowPropertyAutoConfiguration以及SpringAware + * @author justin.xu + */ +@Configuration +@ConditionalOnBean(LiteflowConfig.class) +@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class}) +public class LiteflowExecutorAutoConfiguration { + + @Bean("whenExecutors") + public ExecutorService executorService(LiteflowConfig liteflowConfig) { + Integer useWorker = liteflowConfig.getWhenMaxWorkers(); + Integer useQueue = liteflowConfig.getWhenQueueLimit(); + return ExecutorHelper.buildExecutor(useWorker, useQueue, "liteflow-when-thead", false); + } + + @Bean + public Shutdown shutdown() { + return new Shutdown(); + } +} diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorInit.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorInit.java index 6099211ce..de3a6bd79 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorInit.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowExecutorInit.java @@ -2,7 +2,6 @@ package com.yomahub.liteflow.springboot; import com.yomahub.liteflow.core.FlowExecutor; import org.springframework.beans.factory.InitializingBean; -import javax.annotation.Resource; /** * 执行器初始化类 diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java index 3fe8a8a69..0877f73d3 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java @@ -1,25 +1,15 @@ package com.yomahub.liteflow.springboot; import cn.hutool.core.util.StrUtil; -import com.google.common.collect.Lists; import com.yomahub.liteflow.core.FlowExecutor; -import com.yomahub.liteflow.entity.data.DataBus; import com.yomahub.liteflow.monitor.MonitorBus; import com.yomahub.liteflow.property.LiteflowConfig; -import com.yomahub.liteflow.spring.ComponentScaner; import com.yomahub.liteflow.util.SpringAware; -import org.apache.commons.lang3.StringUtils; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.PropertySource; - -import javax.swing.*; -import java.util.List; /** * 主要的业务装配器 @@ -29,28 +19,28 @@ import java.util.List; */ @Configuration @ConditionalOnBean(LiteflowConfig.class) -@AutoConfigureAfter(LiteflowPropertyAutoConfiguration.class) +@AutoConfigureAfter({LiteflowPropertyAutoConfiguration.class}) @Import(SpringAware.class) public class LiteflowMainAutoConfiguration { @Bean - public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig){ - if(StrUtil.isNotBlank(liteflowConfig.getRuleSource())){ + public FlowExecutor flowExecutor(LiteflowConfig liteflowConfig) { + if (StrUtil.isNotBlank(liteflowConfig.getRuleSource())) { FlowExecutor flowExecutor = new FlowExecutor(); flowExecutor.setLiteflowConfig(liteflowConfig); return flowExecutor; - }else{ + } else { return null; } } @Bean - public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor){ + public LiteflowExecutorInit liteflowExecutorInit(FlowExecutor flowExecutor) { return new LiteflowExecutorInit(flowExecutor); } @Bean - public MonitorBus monitorBus(LiteflowConfig liteflowConfig){ + public MonitorBus monitorBus(LiteflowConfig liteflowConfig) { return new MonitorBus(liteflowConfig); } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java index 0a6ddbc09..8f538eb48 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowProperty.java @@ -18,6 +18,12 @@ public class LiteflowProperty { //异步线程最大等待描述 private int whenMaxWaitSeconds; + //异步线程池最大线程数 + private int whenMaxWorkers; + + //异步线程池最大队列数量 + private int whenQueueLimit; + public String getRuleSource() { return ruleSource; } @@ -41,4 +47,20 @@ public class LiteflowProperty { public void setWhenMaxWaitSeconds(int whenMaxWaitSeconds) { this.whenMaxWaitSeconds = whenMaxWaitSeconds; } + + public int getWhenMaxWorkers() { + return whenMaxWorkers; + } + + public void setWhenMaxWorkers(int whenMaxWorkers) { + this.whenMaxWorkers = whenMaxWorkers; + } + + public int getWhenQueueLimit() { + return whenQueueLimit; + } + + public void setWhenQueueLimit(int whenQueueLimit) { + this.whenQueueLimit = whenQueueLimit; + } } diff --git a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java index e43d14842..6d773f3e3 100644 --- a/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java +++ b/liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowPropertyAutoConfiguration.java @@ -31,6 +31,8 @@ public class LiteflowPropertyAutoConfiguration { liteflowConfig.setQueueLimit(liteflowMonitorProperty.getQueueLimit()); liteflowConfig.setDelay(liteflowMonitorProperty.getDelay()); liteflowConfig.setPeriod(liteflowMonitorProperty.getPeriod()); + liteflowConfig.setWhenMaxWorkers(property.getWhenMaxWorkers()); + liteflowConfig.setWhenQueueLimit(property.getWhenQueueLimit()); return liteflowConfig; } } diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties index ef7873807..60ac4ce71 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/liteflow-default.properties @@ -1,7 +1,9 @@ liteflow.rule-source=config/flow.xml liteflow.slot-size=1024 liteflow.when-max-wait-second=15 +liteflow.when-max-workers=4 +liteflow.when-queue-limit=512 liteflow.monitor.enable-log=false liteflow.monitor.queue-limit=200 liteflow.monitor.delay=300000 -liteflow.monitor.period=300000 \ No newline at end of file +liteflow.monitor.period=300000 diff --git a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories index 23669427e..85b579284 100644 --- a/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories +++ b/liteflow-spring-boot-starter/src/main/resources/META-INF/spring.factories @@ -1,6 +1,7 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.yomahub.liteflow.springboot.LiteflowComponentScannerAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowPropertyAutoConfiguration,\ + com.yomahub.liteflow.springboot.LiteflowExecutorAutoConfiguration,\ com.yomahub.liteflow.springboot.LiteflowMainAutoConfiguration diff --git a/liteflow-test-spring/src/main/java/com/yomahub/flowtest/Runner.java b/liteflow-test-spring/src/main/java/com/yomahub/flowtest/Runner.java index 31becd387..c465152d9 100644 --- a/liteflow-test-spring/src/main/java/com/yomahub/flowtest/Runner.java +++ b/liteflow-test-spring/src/main/java/com/yomahub/flowtest/Runner.java @@ -1,7 +1,7 @@ package com.yomahub.flowtest; import com.yomahub.liteflow.core.FlowExecutor; -import com.yomahub.liteflow.entity.data.Slot; +import com.yomahub.liteflow.entity.data.LiteflowResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; @@ -16,8 +16,8 @@ public class Runner { context.start(); log.info("启动成功"); FlowExecutor flowExecutor = context.getBean(FlowExecutor.class); - Slot slot = flowExecutor.execute("chain3", "it's a request"); - System.out.println(slot); + LiteflowResponse response = flowExecutor.execute("chain3", "it's a request"); + System.out.println(response); while (true){ Thread.sleep(60000); diff --git a/liteflow-test-spring/src/main/resources/applicationContext.xml b/liteflow-test-spring/src/main/resources/applicationContext.xml index 944baa722..6c9fb5111 100644 --- a/liteflow-test-spring/src/main/resources/applicationContext.xml +++ b/liteflow-test-spring/src/main/resources/applicationContext.xml @@ -16,11 +16,13 @@ - + + + @@ -31,5 +33,6 @@ + diff --git a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java index ba6286bf4..5e072cd98 100644 --- a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java +++ b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/TestFlow.java @@ -1,9 +1,7 @@ package com.yomahub.flowtest; import com.yomahub.liteflow.core.FlowExecutor; -import com.yomahub.liteflow.entity.data.Slot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.yomahub.liteflow.entity.data.LiteflowResponse; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @@ -17,7 +15,7 @@ public class TestFlow implements CommandLineRunner { @Override public void run(String... args) throws Exception { - Slot slot = flowExecutor.execute("chain2", "it's a request"); - System.out.println(slot); + LiteflowResponse response= flowExecutor.execute("chain2", "it's a request"); + System.out.println(response); } } diff --git a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java index 62b9e2af3..8e87c67ac 100644 --- a/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java +++ b/liteflow-test-springboot/src/main/java/com/yomahub/flowtest/components/EComponent.java @@ -17,13 +17,9 @@ public class EComponent extends NodeComponent { public void process() { try { Thread.sleep(120L); - System.out.println("E:" + this.getSlot().getOutput("a")); - this.getSlot().setOutput(this.getNodeId(), "E component output"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Eomponent executed!"); - } - } diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java new file mode 100644 index 000000000..e50b26fe9 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/ConcurrentCase.java @@ -0,0 +1,130 @@ +package com.yomahub.flowtest.concurrent; + +import org.junit.Assert; + +import java.util.AbstractMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * 流程的顺序执行、并发执行的CASE构造器 + * @author justin.xu + */ +public class ConcurrentCase { + public static final Map, List>> CASES = new ConcurrentHashMap<>(); + + /** + * 初始化一个测试用例的预期 + * @param request + * @param expected + */ + public static void caseInit(String request, List expected) { + CASES.put(request, new AbstractMap.SimpleEntry, List>(expected, new CopyOnWriteArrayList<>())); + } + + /** + * 添加这个测试用例的实际 + * @param request + * @param actual + */ + public static void caseAdd(String request, Routers actual) { + CASES.computeIfPresent(request, (k, v) -> { + v.getValue().add(actual); + return v; + }); + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssert(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey(), ca.getValue()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + /** + * 测试当前的Expected与Actual是否相同 + * + * @param request + */ + public static void caseAssertRandom(String request) { + AbstractMap.SimpleEntry, List> ca = CASES.get(request); + Assert.assertNotNull(ca); + + Assert.assertEquals(ca.getKey().size(), ca.getValue().size()); + + if (ca.getValue().size() > 0) { + Integer expectedIndex = null; + for (Routers actual : ca.getValue()) { + boolean find = false; + for(Routers routers : ca.getKey()) { + if (routers.getValue().equals(actual.getValue())) { + find = true; + } + } + Assert.assertTrue(find); + + if (expectedIndex == null) { + expectedIndex = actual.getIndex(); + } else { + Assert.assertEquals(expectedIndex.intValue(), actual.getIndex()); + } + } + } + } + + + public static class Routers { + int index; + String value; + + public Routers(String value) { + this.index = -1; + this.value = value; + } + public Routers(int index, String value) { + this.index = index; + this.value = value; + } + + public int getIndex() { + return index; + } + + public String getValue() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Routers routers = (Routers) o; + return value.equals(routers.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java new file mode 100644 index 000000000..9846e2fb5 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/SpringBootApp.java @@ -0,0 +1,20 @@ +package com.yomahub.flowtest.concurrent; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; + +/** + * 启动类 + * @author justin.xu + */ +@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) +public class SpringBootApp { + /** + * @param args + */ + public static void main(String[] args) { + SpringApplication.run(SpringBootApp.class, args); + } + +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java new file mode 100644 index 000000000..ff647fbfb --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestParseFlow.java @@ -0,0 +1,85 @@ +package com.yomahub.flowtest.concurrent; + + +import com.yomahub.liteflow.entity.flow.Chain; +import com.yomahub.liteflow.entity.flow.Condition; +import com.yomahub.liteflow.entity.flow.ThenCondition; +import com.yomahub.liteflow.entity.flow.WhenCondition; +import com.yomahub.liteflow.flow.FlowBus; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + + +/** + * 测试流程的解析 + * @author justin.xu + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestParseFlow { + + private Check caseErrorResume = new Check("test-errorResume", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) + )); + + private Check caseErrorBreak = new Check("test-errorBreak", Arrays.asList( + new AbstractMap.SimpleEntry, Boolean>(ThenCondition.class, null), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, false), + new AbstractMap.SimpleEntry, Boolean>(WhenCondition.class, true) + )); + + @Test + public void parseWhen() throws Exception { + assertTrue(caseErrorResume, FlowBus.getChain(caseErrorResume.getChainCode())); + + assertTrue(caseErrorBreak, FlowBus.getChain(caseErrorBreak.getChainCode())); + } + + private void assertTrue(Check check, Chain chain) { + Assert.assertNotNull(chain); + + Assert.assertTrue(null != chain.getConditionList() && !chain.getConditionList().isEmpty()); + for (int i = 0; i < chain.getConditionList().size(); i ++) { + + AbstractMap.SimpleEntry, Boolean> expected = check.getClazzWithFlags().get(i); + Condition actual = chain.getConditionList().get(i); + + Assert.assertEquals(expected.getKey(), actual.getClass()); + if (actual.getClass().equals(WhenCondition.class)) { + Assert.assertEquals(expected.getValue(), ((WhenCondition) actual).isErrorResume()); + } + } + } + + public static class Check { + private String chainCode; + private List, Boolean>> clazzWithFlags; + + public Check(String chainCode, List, Boolean>> clazzWithFlags) { + this.chainCode = chainCode; + this.clazzWithFlags = clazzWithFlags; + } + + public String getChainCode() { + return chainCode; + } + + public List, Boolean>> getClazzWithFlags() { + return clazzWithFlags; + } + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java new file mode 100644 index 000000000..fad9ca62c --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/TestRunFlow.java @@ -0,0 +1,90 @@ +package com.yomahub.flowtest.concurrent; + +import com.yomahub.liteflow.core.FlowExecutor; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; + +import javax.annotation.Resource; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseAssertRandom; +import static com.yomahub.flowtest.concurrent.ConcurrentCase.caseInit; + +/** + * 测试流程的顺序执行、并发执行等 + * @author justin.xu + */ +@ActiveProfiles("test") +@RunWith(SpringRunner.class) +@SpringBootTest +public class TestRunFlow { + + @Resource + private FlowExecutor flowExecutor; + + private String init(List steps) { + + String requestId = UUID.randomUUID().toString(); + + caseInit(requestId, steps.stream().map(ConcurrentCase.Routers::new).collect(Collectors.toList())); + + return requestId; + } + + @Test + public void mixedRunByErrorResumeTest() throws Exception { + //由于errorResume,即使p5执行失败抛出异常, p7, p8也将会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6", "p7", "p8")); + + flowExecutor.execute("test-errorResume", requestId); + + caseAssertRandom(requestId); + } + + + @Test + public void mixedRunByErrorBreakTest() throws Exception { + //由于errorBreak,p5执行失败抛出异常, p7, p8将不会执行 + String requestId = init(Arrays.asList("s1", "s2", "s3", "s4", "s5", "s6", "p3", "p4", "p6")); + + flowExecutor.execute("test-errorBreak", requestId); + + caseAssertRandom(requestId); + } + + @Test + public void parallelTest() throws InterruptedException { + //测试2个线程并发时,所执行的序列是正常的,线程安全的(slotIndex在每个执行序列chain中都是不变的) + String requestId1 = init(Arrays.asList("c1", "c2", "c3", "c4", "c5")); + String requestId2 = init(Arrays.asList("c6", "c7", "c8", "c9", "c10")); + + List ts = Arrays.asList( + newExecutor("async-concurrent1", requestId1), + newExecutor("async-concurrent2", requestId2) + ); + ts.forEach(Thread::start); + + for (Thread t : ts) { + t.join(); + } + + caseAssertRandom(requestId1); + caseAssertRandom(requestId2); + } + + private Thread newExecutor(String chain, String requestId) { + return new Thread(() -> { + try { + flowExecutor.execute(chain, requestId); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java new file mode 100644 index 000000000..360878450 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C10Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c10") +public class C10Component extends NodeComponent { + + private static final String name = "c10"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java new file mode 100644 index 000000000..641a67629 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C1Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c1") +public class C1Component extends NodeComponent { + + private static final String name = "c1"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java new file mode 100644 index 000000000..7aa9ea9ab --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C2Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c2") +public class C2Component extends NodeComponent { + + private static final String name = "c2"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java new file mode 100644 index 000000000..104d2e1fd --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C3Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c3") +public class C3Component extends NodeComponent { + + private static final String name = "c3"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java new file mode 100644 index 000000000..207b20309 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C4Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c4") +public class C4Component extends NodeComponent { + + private static final String name = "c4"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java new file mode 100644 index 000000000..a5877c041 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C5Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c5") +public class C5Component extends NodeComponent { + + private static final String name = "c5"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java new file mode 100644 index 000000000..af3bca011 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C6Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c6") +public class C6Component extends NodeComponent { + + private static final String name = "c6"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java new file mode 100644 index 000000000..30f47d0d1 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C7Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c7") +public class C7Component extends NodeComponent { + + private static final String name = "c7"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java new file mode 100644 index 000000000..ce9bb91e2 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C8Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c8") +public class C8Component extends NodeComponent { + + private static final String name = "c8"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java new file mode 100644 index 000000000..f770ac294 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/c/C9Component.java @@ -0,0 +1,22 @@ +package com.yomahub.flowtest.concurrent.mock.component.c; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("c9") +public class C9Component extends NodeComponent { + + private static final String name = "c9"; + + @Override + public void process() throws Exception { + Thread.sleep(1_000); + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java new file mode 100644 index 000000000..eaaa96423 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P3Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p3") +public class P3Component extends NodeComponent { + + private static final String name = "p3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java new file mode 100644 index 000000000..4b22bb1ad --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P4Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p4") +public class P4Component extends NodeComponent { + + private static final String name = "p4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java new file mode 100644 index 000000000..c4463fc26 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P5Component.java @@ -0,0 +1,19 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p5") +public class P5Component extends NodeComponent { + + private static final String name = "p5"; + + @Override + public void process() throws Exception { + throw new RuntimeException(String.format("test mock error [%s]", name)); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java new file mode 100644 index 000000000..a17f1ffcd --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P6Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p6") +public class P6Component extends NodeComponent { + + private static final String name = "p6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java new file mode 100644 index 000000000..da1bfa819 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P7Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p7") +public class P7Component extends NodeComponent { + + private static final String name = "p7"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java new file mode 100644 index 000000000..12f612e95 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/p/P8Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.p; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("p8") +public class P8Component extends NodeComponent { + + private static final String name = "p8"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java new file mode 100644 index 000000000..e78c15f35 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S1Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s1") +public class S1Component extends NodeComponent { + + private static final String name = "s1"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java new file mode 100644 index 000000000..5d67adea1 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S2Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s2") +public class S2Component extends NodeComponent { + + private static final String name = "s2"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java new file mode 100644 index 000000000..d85ca5822 --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S3Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s3") +public class S3Component extends NodeComponent { + + private static final String name = "s3"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java new file mode 100644 index 000000000..47bec234d --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S4Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s4") +public class S4Component extends NodeComponent { + + private static final String name = "s4"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java new file mode 100644 index 000000000..4c20855dc --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S5Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s5") +public class S5Component extends NodeComponent { + + private static final String name = "s5"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} \ No newline at end of file diff --git a/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java new file mode 100644 index 000000000..ee2578b6c --- /dev/null +++ b/liteflow-test-springboot/src/test/java/com/yomahub/flowtest/concurrent/mock/component/s/S6Component.java @@ -0,0 +1,21 @@ +package com.yomahub.flowtest.concurrent.mock.component.s; + +import com.yomahub.flowtest.concurrent.ConcurrentCase; +import com.yomahub.liteflow.core.NodeComponent; +import org.springframework.stereotype.Component; + +/** + * 测试mock component + * @author justin.xu + */ +@Component("s6") +public class S6Component extends NodeComponent { + + private static final String name = "s6"; + + @Override + public void process() throws Exception { + ConcurrentCase.caseAdd((String) getSlot().getRequestData(), new ConcurrentCase.Routers(getSlotIndex(), name)); + System.out.println(String.format("[%s] component executed, index[%d].", name, getSlotIndex())); + } +} diff --git a/liteflow-test-springboot/src/test/resources/application-test.yml b/liteflow-test-springboot/src/test/resources/application-test.yml new file mode 100644 index 000000000..a98fcc5ea --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/application-test.yml @@ -0,0 +1,18 @@ +logging: + level: + root: debug + io: + lettuce: info + pattern: + console: "%d{yyyy-MM-dd} %d{hhh:mm:ss},%red(%d{SSS}) %green(%-5level) [%thread] %cyan(%logger{36}) : %msg%n" + +server: + port: 8086 + +liteflow: + rule-source: "config/flow-test.xml" + +threadPool: + parallel: + worker: 3 + queue: 512 diff --git a/liteflow-test-springboot/src/test/resources/config/flow-test.xml b/liteflow-test-springboot/src/test/resources/config/flow-test.xml new file mode 100644 index 000000000..7bdc30870 --- /dev/null +++ b/liteflow-test-springboot/src/test/resources/config/flow-test.xml @@ -0,0 +1,27 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index 4b0a0fec1..5046a8e76 100644 --- a/pom.xml +++ b/pom.xml @@ -41,10 +41,6 @@ UTF-8 1.8 - 3.4 - 4.1 - 2.4 - 1.2 5.0.9.RELEASE 1.7.21 1.2.17 @@ -56,25 +52,11 @@ 2.12.0 4.12 5.3.10 + 2.12.1 - - org.apache.commons - commons-lang3 - ${commons.lang3.version} - - - org.apache.commons - commons-collections4 - ${commons-collections.version} - - - commons-io - commons-io - ${commons-io.version} - org.springframework spring-beans @@ -124,11 +106,6 @@ junit junit ${junit.version} - - - commons-logging - commons-logging - ${commons-logging.version} org.apache.curator @@ -155,6 +132,11 @@ hutool-core ${hutool-core.version} + + com.alibaba + transmittable-thread-local + ${transmittable-thread-local.version} +