Merge branch 'pr_10' into v2.5.0-SNAPSHOT

# Conflicts:
#	liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/Chain.java
#	liteflow-core/src/main/java/com/yomahub/liteflow/entity/flow/ParallelCallable.java
#	liteflow-core/src/main/java/com/yomahub/liteflow/parser/XmlFlowParser.java
#	liteflow-spring-boot-starter/src/main/java/com/yomahub/liteflow/springboot/LiteflowMainAutoConfiguration.java
This commit is contained in:
bryan31
2021-03-28 16:48:42 +08:00
49 changed files with 1267 additions and 86 deletions

View File

@@ -31,6 +31,8 @@ import com.yomahub.liteflow.parser.XmlFlowParser;
import com.yomahub.liteflow.parser.ZookeeperXmlFlowParser;
import com.yomahub.liteflow.property.LiteflowConfig;
import java.util.concurrent.ExecutorService;
/**
* 流程规则主要执行器类
* @author Bryan.Zhang
@@ -180,4 +182,5 @@ public class FlowExecutor {
public void setLiteflowConfig(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;
}
}

View File

@@ -10,6 +10,7 @@ package com.yomahub.liteflow.entity.data;
import java.util.concurrent.atomic.AtomicInteger;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.exception.ConfigErrorException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import org.slf4j.Logger;
@@ -29,12 +30,11 @@ public class DataBus {
static {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
int slotSize = 1024;
if (ObjectUtil.isNotNull(liteflowConfig)){
if (ObjectUtil.isNotNull(liteflowConfig.getSlotSize())){
slotSize = liteflowConfig.getSlotSize();
}
if (ObjectUtil.isNull(liteflowConfig)){
throw new ConfigErrorException("config error, please check liteflow config property");
}
int slotSize = liteflowConfig.getSlotSize();
slots = new Slot[slotSize];
}

View File

@@ -8,38 +8,51 @@
*/
package com.yomahub.liteflow.entity.flow;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.yomahub.liteflow.entity.data.DataBus;
import com.yomahub.liteflow.entity.data.Slot;
import com.yomahub.liteflow.enums.ExecuteTypeEnum;
import com.yomahub.liteflow.exception.FlowSystemException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.util.SpringAware;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* chain对象实现可执行器
* @author Bryan.Zhang
*/
public class Chain implements Executable {
private static final Logger LOG = LoggerFactory.getLogger(Chain.class);
private String chainName;
private List<Condition> conditionList;
private static int whenMaxWaitSeconds;
private static ExecutorService parallelExecutor;
private static final LiteflowConfig liteflowConfig;
static {
LiteflowConfig liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
if (ObjectUtil.isNotNull(liteflowConfig)) {
whenMaxWaitSeconds = liteflowConfig.getWhenMaxWaitSeconds();
} else {
whenMaxWaitSeconds = 15;
//这里liteflowConfig不可能为null
//如果在springboot环境由于自动装配所以不可能为null
//在spring环境如果xml没配置在FlowExecutor的init时候就已经报错了
liteflowConfig = SpringAware.getBean(LiteflowConfig.class);
//这里为了严谨,还是判断了下
if (ObjectUtil.isNull(liteflowConfig)){
throw new ConfigErrorException("config error, please check liteflow config property");
}
parallelExecutor = SpringAware.getBean(ExecutorService.class);
if (ObjectUtil.isNull(parallelExecutor)){
parallelExecutor = ExecutorHelper.buildExecutor(liteflowConfig.getWhenMaxWorkers(), liteflowConfig.getWhenQueueLimit(), "liteflow-when-thread", false);
}
}
@@ -85,11 +98,8 @@ public class Chain implements Executable {
}
}
} else if (condition instanceof WhenCondition) {
final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
for (Executable executableItem : condition.getNodeList()) {
new WhenConditionThread(executableItem, slotIndex, slot.getRequestId(), latch).start();
}
latch.await(whenMaxWaitSeconds, TimeUnit.SECONDS);
executeAsyncCondition((WhenCondition) condition, slotIndex, slot.getRequestId());
}
}
}
@@ -103,4 +113,58 @@ public class Chain implements Executable {
public String getExecuteName() {
return chainName;
}
// 使用线程池执行when并发流程
private void executeAsyncCondition(WhenCondition condition, Integer slotIndex, String requestId) {
final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
final List<Future<Boolean>> futures = new ArrayList<>(condition.getNodeList().size());
for (int i = 0; i < condition.getNodeList().size(); i++) {
futures.add(parallelExecutor.submit(
new ParallelCallable(condition.getNodeList().get(i), slotIndex, requestId, latch)
));
}
boolean interrupted = false;
try {
if (!latch.await(liteflowConfig.getWhenMaxWaitSeconds(), TimeUnit.SECONDS)) {
for (Future<Boolean> f : futures) {
f.cancel(true);
}
interrupted = true;
LOG.warn("requestId [{}] executing async condition has reached max-wait-seconds, condition canceled.", requestId);
}
} catch (InterruptedException e) {
interrupted = true;
}
/**
* 当配置了errorResume = false出现interrupted或者!f.get()的情况将抛出WhenExecuteException
*/
if (!condition.isErrorResume()) {
if (interrupted) {
throw new WhenExecuteException(String.format(
"requestId [%s] when execute interrupted. errorResume [false].", requestId));
}
for (Future<Boolean> f : futures) {
try {
if (!f.get()) {
throw new WhenExecuteException(String.format(
"requestId [%s] when execute failed. errorResume [false].", requestId));
}
} catch (InterruptedException | ExecutionException e) {
throw new WhenExecuteException(String.format(
"requestId [%s] when execute failed. errorResume [false].", requestId));
}
}
} else if (interrupted) {
// 这里由于配置了errorResume所以只打印warn日志
LOG.warn("requestId [{}] executing when condition timeout , but ignore with errorResume.", requestId);
}
}
}

View File

@@ -2,6 +2,7 @@ package com.yomahub.liteflow.entity.flow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -9,9 +10,9 @@ import java.util.concurrent.CountDownLatch;
* 并行器线程
* @author Bryan.Zhang
*/
public class WhenConditionThread extends Thread {
public class ParallelCallable implements Callable<Boolean> {
private static final Logger LOG = LoggerFactory.getLogger(WhenConditionThread.class);
private static final Logger LOG = LoggerFactory.getLogger(ParallelCallable.class);
private Executable executableItem;
@@ -21,7 +22,7 @@ public class WhenConditionThread extends Thread {
private CountDownLatch latch;
public WhenConditionThread(Executable executableItem,Integer slotIndex,String requestId,CountDownLatch latch){
public ParallelCallable(Executable executableItem, Integer slotIndex, String requestId, CountDownLatch latch) {
this.executableItem = executableItem;
this.slotIndex = slotIndex;
this.requestId = requestId;
@@ -29,11 +30,15 @@ public class WhenConditionThread extends Thread {
}
@Override
public void run() {
try{
public Boolean call() throws Exception {
try {
executableItem.execute(slotIndex);
} catch (Exception e) {
LOG.error("item [{}] execute cause error",executableItem.getExecuteName(),e);
return true;
}catch(Exception e){
LOG.error("requestId [{}], item [{}] execute error", requestId, executableItem.getExecuteName());
return false;
} finally {
latch.countDown();
}

View File

@@ -14,9 +14,20 @@ import java.util.List;
* @author Bryan.Zhang
*/
public class WhenCondition extends Condition{
// 增加errorResume属性以区分当when调用链调用失败时是否继续往下执行
private boolean errorResume;
public WhenCondition(List<Executable> nodeList) {
super(nodeList);
errorResume = true;
}
public WhenCondition(List<Executable> nodeList, boolean errorResume) {
super(nodeList);
this.errorResume = errorResume;
}
public boolean isErrorResume() {
return errorResume;
}
}

View File

@@ -0,0 +1,21 @@
package com.yomahub.liteflow.exception;
public class WhenExecuteException extends RuntimeException {
private static final long serialVersionUID = 1L;
/** 异常信息 */
private String message;
public WhenExecuteException(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@@ -38,14 +38,6 @@ public class MonitorBus {
private LiteflowConfig liteflowConfig;
private boolean enableLog = false;
private int queueLimit = 200;
private long delay = 300000;
private long preiod = 300000;
private final Logger LOG = LoggerFactory.getLogger(this.getClass());
private final ConcurrentHashMap<String, BoundedPriorityQueue<CompStatistics>> statisticsMap = new ConcurrentHashMap<>();
@@ -53,25 +45,9 @@ public class MonitorBus {
public MonitorBus(LiteflowConfig liteflowConfig) {
this.liteflowConfig = liteflowConfig;
if (ObjectUtil.isNotNull(liteflowConfig.getEnableLog())){
this.enableLog = liteflowConfig.getEnableLog();
}
if (ObjectUtil.isNotNull(liteflowConfig.getQueueLimit())){
queueLimit = liteflowConfig.getQueueLimit();
}
if (ObjectUtil.isNotNull(liteflowConfig.getDelay())){
delay = liteflowConfig.getDelay();
}
if (ObjectUtil.isNotNull(liteflowConfig.getPeriod())){
preiod = liteflowConfig.getPeriod();
}
if(enableLog){
if(liteflowConfig.getEnableLog()){
Timer timer = new Timer();
timer.schedule(new MonitorTimeTask(this), delay, preiod);
timer.schedule(new MonitorTimeTask(this), liteflowConfig.getDelay(), liteflowConfig.getPeriod());
}
}
@@ -79,7 +55,7 @@ public class MonitorBus {
if(statisticsMap.containsKey(statistics.getComponentClazzName())){
statisticsMap.get(statistics.getComponentClazzName()).add(statistics);
}else{
BoundedPriorityQueue<CompStatistics> queue = new BoundedPriorityQueue<>(queueLimit);
BoundedPriorityQueue<CompStatistics> queue = new BoundedPriorityQueue<>(liteflowConfig.getQueueLimit());
queue.offer(statistics);
statisticsMap.put(statistics.getComponentClazzName(), queue);
}

View File

@@ -7,26 +7,21 @@ import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.entity.flow.*;
import com.yomahub.liteflow.exception.ExecutableItemNotFoundException;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.util.SpringAware;
import org.apache.commons.lang3.StringUtils;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.NodeComponent;
import com.yomahub.liteflow.entity.flow.Chain;
import com.yomahub.liteflow.entity.flow.Condition;
import com.yomahub.liteflow.entity.flow.Executable;
import com.yomahub.liteflow.entity.flow.Node;
import com.yomahub.liteflow.entity.flow.ThenCondition;
import com.yomahub.liteflow.entity.flow.WhenCondition;
import com.yomahub.liteflow.exception.ExecutableItemNotFoundException;
import com.yomahub.liteflow.exception.ParseException;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.spring.ComponentScaner;
import com.yomahub.liteflow.util.SpringAware;
/**
* xml形式的解析器
@@ -144,7 +139,12 @@ public abstract class XmlFlowParser {
if (condE.getName().equals("then")) {
conditionList.add(new ThenCondition(chainNodeList));
} else if (condE.getName().equals("when")) {
conditionList.add(new WhenCondition(chainNodeList));
Attribute errorResume = condE.attribute("errorResume");
if (errorResume != null) {
conditionList.add(new WhenCondition(chainNodeList, errorResume.getValue().equals(Boolean.TRUE.toString())));
} else {
conditionList.add(new WhenCondition(chainNodeList));
}
}
}
FlowBus.addChain(chainName, new Chain(chainName,conditionList));

View File

@@ -7,8 +7,13 @@
*/
package com.yomahub.liteflow.property;
import cn.hutool.core.util.ObjectUtil;
/**
* liteflow的配置实体类
* 这个类中的属性为什么不用基本类型,而用包装类型呢
* 是因为这个类是springboot和spring的最终参数获取器考虑到spring的场景有些参数不是必须配置。基本类型就会出现默认值的情况。
* 所以为了要有null值出现这里采用包装类型
* @author Bryan.Zhang
*/
public class LiteflowConfig {
@@ -34,6 +39,12 @@ public class LiteflowConfig {
//每隔多少秒打印
private Long period;
//异步线程池最大线程数
private Integer whenMaxWorkers;
//异步线程池最大队列数量
private Integer whenQueueLimit;
public String getRuleSource() {
return ruleSource;
}
@@ -43,7 +54,11 @@ public class LiteflowConfig {
}
public Integer getSlotSize() {
return slotSize;
if (ObjectUtil.isNull(slotSize)){
return 1024;
}else{
return slotSize;
}
}
public void setSlotSize(Integer slotSize) {
@@ -51,7 +66,11 @@ public class LiteflowConfig {
}
public Integer getWhenMaxWaitSeconds() {
return whenMaxWaitSeconds;
if (ObjectUtil.isNull(whenMaxWaitSeconds)){
return 15;
}else{
return whenMaxWaitSeconds;
}
}
public void setWhenMaxWaitSeconds(Integer whenMaxWaitSeconds) {
@@ -59,7 +78,11 @@ public class LiteflowConfig {
}
public Integer getQueueLimit() {
return queueLimit;
if (ObjectUtil.isNull(queueLimit)){
return 200;
}else{
return queueLimit;
}
}
public void setQueueLimit(Integer queueLimit) {
@@ -67,7 +90,11 @@ public class LiteflowConfig {
}
public Long getDelay() {
return delay;
if (ObjectUtil.isNull(delay)){
return 300000L;
}else{
return delay;
}
}
public void setDelay(Long delay) {
@@ -75,7 +102,11 @@ public class LiteflowConfig {
}
public Long getPeriod() {
return period;
if (ObjectUtil.isNull(period)){
return 300000L;
}else{
return period;
}
}
public void setPeriod(Long period) {
@@ -83,10 +114,38 @@ public class LiteflowConfig {
}
public Boolean getEnableLog() {
return enableLog;
if (ObjectUtil.isNull(enableLog)){
return false;
}else{
return enableLog;
}
}
public void setEnableLog(Boolean enableLog) {
this.enableLog = enableLog;
}
public Integer getWhenMaxWorkers() {
if (ObjectUtil.isNull(whenMaxWorkers)){
return Runtime.getRuntime().availableProcessors() * 2;
}else{
return whenMaxWorkers;
}
}
public void setWhenMaxWorkers(Integer whenMaxWorkers) {
this.whenMaxWorkers = whenMaxWorkers;
}
public Integer getWhenQueueLimit() {
if (ObjectUtil.isNull(whenQueueLimit)){
return 512;
}else{
return whenQueueLimit;
}
}
public void setWhenQueueLimit(Integer whenQueueLimit) {
this.whenQueueLimit = whenQueueLimit;
}
}

View File

@@ -0,0 +1,98 @@
/**
* <p>Title: liteflow</p>
* <p>Description: 轻量级的组件式流程框架</p>
* @author Bryan.Zhang
* @email weenyc31@163.com
* @Date 2020/4/1
*/
package com.yomahub.liteflow.util;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 线程池工具类
* @author Bryan.Zhang
*/
public class ExecutorHelper {
private ExecutorHelper() {
}
/**
* 使用默认的等待时间1分钟来关闭目标线程组。
* <p>
*
* @param pool 需要关闭的线程组.
*/
public static void shutdownAwaitTermination(ExecutorService pool) {
shutdownAwaitTermination(pool, 60L);
}
/**
* 关闭ExecutorService的线程管理者
* <p>
*
* @param pool 需要关闭的管理者
* @param timeout 等待时间
*/
public static void shutdownAwaitTermination(ExecutorService pool,
long timeout) {
pool.shutdown();
try {
if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) {
pool.shutdownNow();
if (!pool.awaitTermination(timeout, TimeUnit.SECONDS)) {
System.err.println("Pool did not terminate.");
}
}
} catch (InterruptedException ie) {
pool.shutdownNow();
Thread.currentThread().interrupt();
}
}
/**
* 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名.
* 创建的线程都是非后台线程.
*
* @param name 名称.
* @return 线程工厂实例.
*/
public static ThreadFactory buildExecutorFactory(final String name) {
return buildExecutorFactory(name, false);
}
/**
* 返回一个线程工厂,这是一个可以定义线程名字的线程工厂,返回的线程都将被命名.
*
* @param name 名称.
* @param daemon 是否为后台线程.
* @return 线程工厂实例.
*/
public static ThreadFactory buildExecutorFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
private final AtomicLong number = new AtomicLong();
@Override
public Thread newThread(Runnable r) {
Thread newThread = Executors.defaultThreadFactory().newThread(r);
newThread.setName(name + "-" + number.getAndIncrement());
newThread.setDaemon(daemon);
return newThread;
}
};
}
public static ExecutorService buildExecutor(int worker, int queue, String namePrefix, boolean daemon) {
return new ThreadPoolExecutor(worker, worker,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(queue),
buildExecutorFactory(namePrefix, daemon),
new ThreadPoolExecutor.AbortPolicy()
);
}
}

View File

@@ -0,0 +1,26 @@
package com.yomahub.liteflow.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
/**
* 关闭shutdown类
* 执行清理工作
* @author Bryan.Zhang
*/
public class Shutdown {
private static final Logger LOG = LoggerFactory.getLogger(Shutdown.class);
@PreDestroy
public void destroy() throws Exception {
ExecutorService executorService = SpringAware.getBean("whenExecutors");
LOG.info("Start closing the liteflow-when-calls...");
ExecutorHelper.shutdownAwaitTermination(executorService);
LOG.info("Succeed closing the liteflow-when-calls ok...");
}
}

View File

@@ -28,11 +28,21 @@ public class SpringAware implements ApplicationContextAware {
}
public static <T> T getBean(String name) {
return (T) applicationContext.getBean(name);
try{
T t = (T) applicationContext.getBean(name);
return t;
}catch (Exception e){
return null;
}
}
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
try{
T t = applicationContext.getBean(clazz);
return t;
}catch (Exception e){
return null;
}
}
public static <T> T registerBean(Class<T> c) {