feat #I6BDLN 绝对路径的目录位置及其所有子目录下规则配置文件的侦听

This commit is contained in:
gaibu
2023-01-28 20:09:28 +08:00
parent 3321aaa2ad
commit dea0d35303
10 changed files with 227 additions and 59 deletions

View File

@@ -18,6 +18,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.id.IdGeneratorHolder;
import com.yomahub.liteflow.monitor.MonitorFile;
import com.yomahub.liteflow.parser.base.FlowParser;
import com.yomahub.liteflow.parser.factory.FlowParserProvider;
import com.yomahub.liteflow.parser.spi.ParserClassNameSpi;
@@ -89,11 +90,11 @@ public class FlowExecutor {
//所有的Parser的SPI实现都是以custom形式放入的且只支持xml形式
ServiceLoader<ParserClassNameSpi> loader = ServiceLoader.load(ParserClassNameSpi.class);
Iterator<ParserClassNameSpi> it = loader.iterator();
if (it.hasNext()){
if (it.hasNext()) {
ParserClassNameSpi parserClassNameSpi = it.next();
ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName();
liteflowConfig.setRuleSource(ruleSource);
}else{
} else {
//ruleSource为空而且没有spi形式的扩展那么说明真的没有ruleSource
//这种情况有可能是基于代码动态构建的
return;
@@ -167,30 +168,37 @@ public class FlowExecutor {
}
//如果是ruleSource方式的最后判断下有没有解析出来,如果没有解析出来则报错
if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())){
if (FlowBus.getChainMap().isEmpty()){
if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())) {
if (FlowBus.getChainMap().isEmpty()) {
String errMsg = StrUtil.format("no valid rule config found in rule path [{}]", liteflowConfig.getRuleSource());
throw new ConfigErrorException(errMsg);
}
}
//执行钩子
if(hook){
if (hook) {
FlowInitHook.executeHook();
}
// 文件监听
if (liteflowConfig.getMonitorFileEnable()){
MonitorFile.getInstance().create();
}
}
//此方法就是从原有的配置源主动拉取新的进行刷新
//和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取一个为被动监听到新的内容进行刷新
public void reloadRule() {
long start = System.currentTimeMillis();
init(false);
LOG.info("reload rules takes {}ms", System.currentTimeMillis() - start);
}
//隐式流程的调用方法
@Deprecated
public void invoke(String chainId, Object param, Integer slotIndex) throws Exception {
LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_SYNC);
if (!response.isSuccess()){
if (!response.isSuccess()) {
throw response.getCause();
}
}
@@ -198,7 +206,7 @@ public class FlowExecutor {
@Deprecated
public void invokeInAsync(String chainId, Object param, Integer slotIndex) throws Exception {
LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_ASYNC);
if (!response.isSuccess()){
if (!response.isSuccess()) {
throw response.getCause();
}
}
@@ -240,7 +248,7 @@ public class FlowExecutor {
//调用一个流程并返回Future<LiteflowResponse>,允许多上下文的传入
public Future<LiteflowResponse> execute2Future(String chainId, Object param, Class<?>... contextBeanClazzArray) {
return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(()
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null));
-> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null));
}
@@ -251,11 +259,11 @@ public class FlowExecutor {
//调用一个流程,返回默认的上下文,适用于简单的调用
@Deprecated
public DefaultContext execute(String chainId, Object param) throws Exception{
public DefaultContext execute(String chainId, Object param) throws Exception {
LiteflowResponse response = this.execute2Resp(chainId, param, DefaultContext.class);
if (!response.isSuccess()){
if (!response.isSuccess()) {
throw response.getCause();
}else{
} else {
return response.getFirstContextBean();
}
}
@@ -269,8 +277,8 @@ public class FlowExecutor {
}
private LiteflowResponse invoke2Resp(String chainId,
Object param,
Integer slotIndex, InnerChainTypeEnum innerChainType) {
Object param,
Integer slotIndex, InnerChainTypeEnum innerChainType) {
Slot slot = doExecute(chainId, param, null, null, slotIndex, innerChainType);
return LiteflowResponse.newInnerResponse(chainId, slot);
}
@@ -288,9 +296,9 @@ public class FlowExecutor {
//如果不是隐式流程那么需要分配Slot
if (innerChainType.equals(InnerChainTypeEnum.NONE) && ObjectUtil.isNull(slotIndex)) {
//这里可以根据class分配也可以根据bean去分配
if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){
if (ArrayUtil.isNotEmpty(contextBeanClazzArray)) {
slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray));
}else{
} else {
slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray));
}
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
@@ -311,7 +319,7 @@ public class FlowExecutor {
//如果是隐式流程事先把subException给置空然后把隐式流程的chainId放入slot元数据中
//我知道这在多线程调用隐式流程中会有问题。但是考虑到这种场景的不会多,也有其他的转换方式。
//所以暂且这么做,以后再优化
if (!innerChainType.equals(InnerChainTypeEnum.NONE)){
if (!innerChainType.equals(InnerChainTypeEnum.NONE)) {
slot.removeSubException(chainId);
slot.addSubChain(chainId);
}
@@ -326,9 +334,9 @@ public class FlowExecutor {
if (ObjectUtil.isNotNull(param)) {
if (innerChainType.equals(InnerChainTypeEnum.NONE)) {
slot.setRequestData(param);
} else if(innerChainType.equals(InnerChainTypeEnum.IN_SYNC)){
} else if (innerChainType.equals(InnerChainTypeEnum.IN_SYNC)) {
slot.setChainReqData(chainId, param);
} else if(innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)){
} else if (innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)) {
slot.setChainReqData2Queue(chainId, param);
}
}
@@ -351,15 +359,15 @@ public class FlowExecutor {
} catch (Exception e) {
if (ObjectUtil.isNotNull(chain)) {
String errMsg = StrUtil.format("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex);
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
LOG.error(errMsg, e);
}else{
} else {
LOG.error(errMsg);
}
}else{
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){
} else {
if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) {
LOG.error(e.getMessage(), e);
}else{
} else {
LOG.error(e.getMessage());
}
}
@@ -368,7 +376,7 @@ public class FlowExecutor {
//如果是隐式流程则需要设置到隐式流程的exception属性里
if (innerChainType.equals(InnerChainTypeEnum.NONE)) {
slot.setException(e);
}else{
} else {
slot.setSubException(chainId, e);
}
} finally {

View File

@@ -0,0 +1,73 @@
package com.yomahub.liteflow.monitor;
import cn.hutool.core.io.watch.SimpleWatcher;
import cn.hutool.core.io.watch.WatchMonitor;
import cn.hutool.core.io.watch.watchers.DelayWatcher;
import cn.hutool.core.lang.Singleton;
import com.yomahub.liteflow.core.FlowExecutorHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.nio.file.WatchEvent;
import java.util.ArrayList;
import java.util.List;
/**
* 规则文件监听器
*
* @author tangkc
*/
public class MonitorFile {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final List<String> PATH_LIST = new ArrayList<>();
public static MonitorFile getInstance() {
return Singleton.get(MonitorFile.class);
}
/**
* 添加监听文件路径
*
* @param filePath 文件路径
*/
public void addMonitorFilePath(String filePath) {
PATH_LIST.add(filePath);
}
/**
* 添加监听文件路径
*
* @param filePaths 文件路径
*/
public void addMonitorFilePaths(List<String> filePaths) {
PATH_LIST.addAll(filePaths);
}
/**
* 创建文件监听
*/
public void create() {
for (String filePath : PATH_LIST) {
// 这里只监听两种类型,文件修改和文件覆盖
WatchMonitor.createAll(filePath, new DelayWatcher(new SimpleWatcher() {
@Override
public void onModify(WatchEvent<?> event, Path currentPath) {
logger.info("file modify,filePath={}", filePath);
FlowExecutorHolder.loadInstance().reloadRule();
}
@Override
public void onOverflow(WatchEvent<?> event, Path currentPath) {
logger.info("file over flow,filePath={}", filePath);
FlowExecutorHolder.loadInstance().reloadRule();
}
// 在监听目录或文件时如果这个文件有修改操作JDK会多次触发modify方法为了解决这个问题
// 合并 500 毫秒内相同的变化
}, 500)).start();
}
}
}

View File

@@ -1,11 +1,13 @@
package com.yomahub.liteflow.parser.el;
import com.yomahub.liteflow.monitor.MonitorFile;
import com.yomahub.liteflow.spi.holder.PathContentParserHolder;
import java.util.List;
/**
* 基于本地的json方式EL表达式解析器
*
* @author Bryan.Zhang
* @since 2.8.0
*/
@@ -14,6 +16,11 @@ public class LocalJsonFlowELParser extends JsonFlowELParser {
@Override
public void parseMain(List<String> pathList) throws Exception {
List<String> contentList = PathContentParserHolder.loadContextAware().parseContent(pathList);
// 添加规则文件监听
List<String> fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList);
MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath);
parse(contentList);
}
}

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.parser.el;
import com.yomahub.liteflow.monitor.MonitorFile;
import com.yomahub.liteflow.spi.holder.PathContentParserHolder;
import java.util.List;
@@ -13,6 +14,11 @@ public class LocalXmlFlowELParser extends XmlFlowELParser{
@Override
public void parseMain(List<String> pathList) throws Exception {
List<String> contentList = PathContentParserHolder.loadContextAware().parseContent(pathList);
// 添加规则文件监听
List<String> fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList);
MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath);
parse(contentList);
}
}

View File

@@ -1,5 +1,6 @@
package com.yomahub.liteflow.parser.el;
import com.yomahub.liteflow.monitor.MonitorFile;
import com.yomahub.liteflow.spi.holder.PathContentParserHolder;
import java.util.List;
@@ -14,6 +15,11 @@ public class LocalYmlFlowELParser extends YmlFlowELParser {
@Override
public void parseMain(List<String> pathList) throws Exception {
List<String> contentList = PathContentParserHolder.loadContextAware().parseContent(pathList);
// 添加规则文件监听
List<String> fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList);
MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath);
parse(contentList);
}

View File

@@ -94,6 +94,17 @@ public class LiteflowConfig {
//替补组件class路径
private String substituteCmpClass;
// 规则文件/脚本文件变更监听
private Boolean monitorFileEnable = Boolean.TRUE;
public Boolean getMonitorFileEnable() {
return monitorFileEnable;
}
public void setMonitorFileEnable(Boolean monitorFileEnable) {
this.monitorFileEnable = monitorFileEnable;
}
public Boolean getEnable() {
if (ObjectUtil.isNull(enable)) {
return Boolean.TRUE;

View File

@@ -5,4 +5,6 @@ import java.util.List;
public interface PathContentParser extends SpiPriority{
List<String> parseContent(List<String> pathList) throws Exception;
List<String> getFileAbsolutePath(List<String> pathList) throws Exception;
}

View File

@@ -2,6 +2,8 @@ package com.yomahub.liteflow.spi.local;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.io.resource.FileResource;
import cn.hutool.core.io.resource.ResourceUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ConfigErrorException;
@@ -18,14 +20,14 @@ public class LocalPathContentParser implements PathContentParser {
@Override
public List<String> parseContent(List<String> pathList) throws Exception {
if(CollectionUtil.isEmpty(pathList)){
if (CollectionUtil.isEmpty(pathList)) {
throw new ConfigErrorException("rule source must not be null");
}
List<String> contentList = new ArrayList<>();
for(String path : pathList){
if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)){
for (String path : pathList) {
if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) {
path = FILE_URL_PREFIX + path;
} else {
if (!path.startsWith(CLASSPATH_URL_PREFIX)) {
@@ -33,7 +35,7 @@ public class LocalPathContentParser implements PathContentParser {
}
}
String content = ResourceUtil.readUtf8Str(path);
if (StrUtil.isNotBlank(content)){
if (StrUtil.isNotBlank(content)) {
contentList.add(content);
}
}
@@ -41,6 +43,29 @@ public class LocalPathContentParser implements PathContentParser {
return contentList;
}
@Override
public List<String> getFileAbsolutePath(List<String> pathList) throws Exception {
if (CollectionUtil.isEmpty(pathList)) {
throw new ConfigErrorException("rule source must not be null");
}
List<String> result = new ArrayList<>();
for (String path : pathList) {
if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) {
path = FILE_URL_PREFIX + path;
result.add(new FileResource(path).getFile().getAbsolutePath());
} else {
if (!path.startsWith(CLASSPATH_URL_PREFIX)) {
path = CLASSPATH_URL_PREFIX + path;
result.add(new ClassPathResource(path).getAbsolutePath());
}
}
}
return result;
}
@Override
public int priority() {
return 2;

View File

@@ -10,7 +10,7 @@ import com.yomahub.liteflow.spi.PathContentParser;
import org.noear.solon.Utils;
import java.io.File;
import java.net.URI;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashSet;
@@ -20,7 +20,32 @@ import java.util.Set;
public class SolonPathContentParser implements PathContentParser {
@Override
public List<String> parseContent(List<String> pathList) throws Exception {
if(CollectionUtil.isEmpty(pathList)){
List<URL> allResource = getUrls(pathList);
//转换成内容List
List<String> contentList = new ArrayList<>();
for (URL resource : allResource) {
String content = IoUtil.read(resource.openStream(), CharsetUtil.CHARSET_UTF_8);
if (StrUtil.isNotBlank(content)) {
contentList.add(content);
}
}
return contentList;
}
@Override
public List<String> getFileAbsolutePath(List<String> pathList) throws Exception {
List<URL> allResource = getUrls(pathList);
List<String> result = new ArrayList<>();
for (URL url : allResource) {
result.add(url.getPath());
}
return result;
}
private static List<URL> getUrls(List<String> pathList) throws MalformedURLException {
if (CollectionUtil.isEmpty(pathList)) {
throw new ConfigErrorException("rule source must not be null");
}
@@ -44,17 +69,7 @@ public class SolonPathContentParser implements PathContentParser {
if (fileTypeSet.size() != 1) {
throw new ConfigErrorException("config error,please use the same type of configuration");
}
//转换成内容List
List<String> contentList = new ArrayList<>();
for (URL resource : allResource) {
String content = IoUtil.read(resource.openStream(), CharsetUtil.CHARSET_UTF_8);
if (StrUtil.isNotBlank(content)){
contentList.add(content);
}
}
return contentList;
return allResource;
}
@Override

View File

@@ -1,6 +1,5 @@
package com.yomahub.liteflow.spi.spring;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.io.FileUtil;
@@ -9,24 +8,49 @@ import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.exception.ConfigErrorException;
import com.yomahub.liteflow.property.LiteflowConfig;
import com.yomahub.liteflow.property.LiteflowConfigGetter;
import com.yomahub.liteflow.spi.PathContentParser;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.util.ResourceUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
public class SpringPathContentParser implements PathContentParser {
@Override
public List<String> parseContent(List<String> pathList) throws Exception {
if(CollectionUtil.isEmpty(pathList)){
List<Resource> allResource = getResources(pathList);
//转换成内容List
List<String> contentList = new ArrayList<>();
for (Resource resource : allResource) {
String content = IoUtil.read(resource.getInputStream(), CharsetUtil.CHARSET_UTF_8);
if (StrUtil.isNotBlank(content)) {
contentList.add(content);
}
}
return contentList;
}
@Override
public List<String> getFileAbsolutePath(List<String> pathList) throws Exception {
List<Resource> allResource = getResources(pathList);
//转换成内容List
List<String> result = new ArrayList<>();
for (Resource resource : allResource) {
result.add(resource.getFile().getAbsolutePath());
}
return result;
}
private List<Resource> getResources(List<String> pathList) throws IOException {
if (CollectionUtil.isEmpty(pathList)) {
throw new ConfigErrorException("rule source must not be null");
}
@@ -35,12 +59,12 @@ public class SpringPathContentParser implements PathContentParser {
String locationPattern;
//如果path是绝对路径且这个文件存在时我们认为这是一个本地文件路径而并非classpath路径
if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)){
if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) {
locationPattern = ResourceUtils.FILE_URL_PREFIX + path;
} else {
if (!path.startsWith(ResourceUtils.CLASSPATH_URL_PREFIX) && !path.startsWith(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX)) {
locationPattern = ResourceUtils.CLASSPATH_URL_PREFIX + path;
}else{
} else {
locationPattern = path;
}
}
@@ -58,19 +82,10 @@ public class SpringPathContentParser implements PathContentParser {
if (fileTypeSet.size() > 1) {
throw new ConfigErrorException("config error,please use the same type of configuration");
}
//转换成内容List
List<String> contentList = new ArrayList<>();
for (Resource resource : allResource) {
String content = IoUtil.read(resource.getInputStream(), CharsetUtil.CHARSET_UTF_8);
if (StrUtil.isNotBlank(content)){
contentList.add(content);
}
}
return contentList;
return allResource;
}
@Override
public int priority() {
return 1;