diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java index da831e266..f9e99541b 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/core/FlowExecutor.java @@ -18,6 +18,7 @@ import com.yomahub.liteflow.flow.LiteflowResponse; import com.yomahub.liteflow.flow.element.Chain; import com.yomahub.liteflow.flow.element.Node; import com.yomahub.liteflow.flow.id.IdGeneratorHolder; +import com.yomahub.liteflow.monitor.MonitorFile; import com.yomahub.liteflow.parser.base.FlowParser; import com.yomahub.liteflow.parser.factory.FlowParserProvider; import com.yomahub.liteflow.parser.spi.ParserClassNameSpi; @@ -89,11 +90,11 @@ public class FlowExecutor { //所有的Parser的SPI实现都是以custom形式放入的,且只支持xml形式 ServiceLoader loader = ServiceLoader.load(ParserClassNameSpi.class); Iterator it = loader.iterator(); - if (it.hasNext()){ + if (it.hasNext()) { ParserClassNameSpi parserClassNameSpi = it.next(); ruleSource = "el_xml:" + parserClassNameSpi.getSpiClassName(); liteflowConfig.setRuleSource(ruleSource); - }else{ + } else { //ruleSource为空,而且没有spi形式的扩展,那么说明真的没有ruleSource //这种情况有可能是基于代码动态构建的 return; @@ -167,30 +168,37 @@ public class FlowExecutor { } //如果是ruleSource方式的,最后判断下有没有解析出来,如果没有解析出来则报错 - if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())){ - if (FlowBus.getChainMap().isEmpty()){ + if (StrUtil.isBlank(liteflowConfig.getRuleSourceExtData()) && MapUtil.isEmpty(liteflowConfig.getRuleSourceExtDataMap())) { + if (FlowBus.getChainMap().isEmpty()) { String errMsg = StrUtil.format("no valid rule config found in rule path [{}]", liteflowConfig.getRuleSource()); throw new ConfigErrorException(errMsg); } } //执行钩子 - if(hook){ + if (hook) { FlowInitHook.executeHook(); } + + // 文件监听 + if (liteflowConfig.getMonitorFileEnable()){ + MonitorFile.getInstance().create(); + } } //此方法就是从原有的配置源主动拉取新的进行刷新 //和FlowBus.refreshFlowMetaData的区别就是一个为主动拉取,一个为被动监听到新的内容进行刷新 public void reloadRule() { + long start = System.currentTimeMillis(); init(false); + LOG.info("reload rules takes {}ms", System.currentTimeMillis() - start); } //隐式流程的调用方法 @Deprecated public void invoke(String chainId, Object param, Integer slotIndex) throws Exception { LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_SYNC); - if (!response.isSuccess()){ + if (!response.isSuccess()) { throw response.getCause(); } } @@ -198,7 +206,7 @@ public class FlowExecutor { @Deprecated public void invokeInAsync(String chainId, Object param, Integer slotIndex) throws Exception { LiteflowResponse response = this.invoke2Resp(chainId, param, slotIndex, InnerChainTypeEnum.IN_ASYNC); - if (!response.isSuccess()){ + if (!response.isSuccess()) { throw response.getCause(); } } @@ -240,7 +248,7 @@ public class FlowExecutor { //调用一个流程并返回Future,允许多上下文的传入 public Future execute2Future(String chainId, Object param, Class... contextBeanClazzArray) { return ExecutorHelper.loadInstance().buildMainExecutor(liteflowConfig.getMainExecutorClass()).submit(() - -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray,null)); + -> FlowExecutorHolder.loadInstance().execute2Resp(chainId, param, contextBeanClazzArray, null)); } @@ -251,11 +259,11 @@ public class FlowExecutor { //调用一个流程,返回默认的上下文,适用于简单的调用 @Deprecated - public DefaultContext execute(String chainId, Object param) throws Exception{ + public DefaultContext execute(String chainId, Object param) throws Exception { LiteflowResponse response = this.execute2Resp(chainId, param, DefaultContext.class); - if (!response.isSuccess()){ + if (!response.isSuccess()) { throw response.getCause(); - }else{ + } else { return response.getFirstContextBean(); } } @@ -269,8 +277,8 @@ public class FlowExecutor { } private LiteflowResponse invoke2Resp(String chainId, - Object param, - Integer slotIndex, InnerChainTypeEnum innerChainType) { + Object param, + Integer slotIndex, InnerChainTypeEnum innerChainType) { Slot slot = doExecute(chainId, param, null, null, slotIndex, innerChainType); return LiteflowResponse.newInnerResponse(chainId, slot); } @@ -288,9 +296,9 @@ public class FlowExecutor { //如果不是隐式流程,那么需要分配Slot if (innerChainType.equals(InnerChainTypeEnum.NONE) && ObjectUtil.isNull(slotIndex)) { //这里可以根据class分配,也可以根据bean去分配 - if (ArrayUtil.isNotEmpty(contextBeanClazzArray)){ + if (ArrayUtil.isNotEmpty(contextBeanClazzArray)) { slotIndex = DataBus.offerSlotByClass(ListUtil.toList(contextBeanClazzArray)); - }else{ + } else { slotIndex = DataBus.offerSlotByBean(ListUtil.toList(contextBeanArray)); } if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) { @@ -311,7 +319,7 @@ public class FlowExecutor { //如果是隐式流程,事先把subException给置空,然后把隐式流程的chainId放入slot元数据中 //我知道这在多线程调用隐式流程中会有问题。但是考虑到这种场景的不会多,也有其他的转换方式。 //所以暂且这么做,以后再优化 - if (!innerChainType.equals(InnerChainTypeEnum.NONE)){ + if (!innerChainType.equals(InnerChainTypeEnum.NONE)) { slot.removeSubException(chainId); slot.addSubChain(chainId); } @@ -326,9 +334,9 @@ public class FlowExecutor { if (ObjectUtil.isNotNull(param)) { if (innerChainType.equals(InnerChainTypeEnum.NONE)) { slot.setRequestData(param); - } else if(innerChainType.equals(InnerChainTypeEnum.IN_SYNC)){ + } else if (innerChainType.equals(InnerChainTypeEnum.IN_SYNC)) { slot.setChainReqData(chainId, param); - } else if(innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)){ + } else if (innerChainType.equals(InnerChainTypeEnum.IN_ASYNC)) { slot.setChainReqData2Queue(chainId, param); } } @@ -351,15 +359,15 @@ public class FlowExecutor { } catch (Exception e) { if (ObjectUtil.isNotNull(chain)) { String errMsg = StrUtil.format("[{}]:chain[{}] execute error on slot[{}]", slot.getRequestId(), chain.getChainName(), slotIndex); - if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){ + if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) { LOG.error(errMsg, e); - }else{ + } else { LOG.error(errMsg); } - }else{ - if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())){ + } else { + if (BooleanUtil.isTrue(liteflowConfig.getPrintExecutionLog())) { LOG.error(e.getMessage(), e); - }else{ + } else { LOG.error(e.getMessage()); } } @@ -368,7 +376,7 @@ public class FlowExecutor { //如果是隐式流程,则需要设置到隐式流程的exception属性里 if (innerChainType.equals(InnerChainTypeEnum.NONE)) { slot.setException(e); - }else{ + } else { slot.setSubException(chainId, e); } } finally { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java new file mode 100644 index 000000000..c60e51b66 --- /dev/null +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java @@ -0,0 +1,73 @@ +package com.yomahub.liteflow.monitor; + +import cn.hutool.core.io.watch.SimpleWatcher; +import cn.hutool.core.io.watch.WatchMonitor; +import cn.hutool.core.io.watch.watchers.DelayWatcher; +import cn.hutool.core.lang.Singleton; +import com.yomahub.liteflow.core.FlowExecutorHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.file.Path; +import java.nio.file.WatchEvent; +import java.util.ArrayList; +import java.util.List; + +/** + * 规则文件监听器 + * + * @author tangkc + */ +public class MonitorFile { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final List PATH_LIST = new ArrayList<>(); + + public static MonitorFile getInstance() { + return Singleton.get(MonitorFile.class); + } + + /** + * 添加监听文件路径 + * + * @param filePath 文件路径 + */ + public void addMonitorFilePath(String filePath) { + PATH_LIST.add(filePath); + } + + /** + * 添加监听文件路径 + * + * @param filePaths 文件路径 + */ + public void addMonitorFilePaths(List filePaths) { + PATH_LIST.addAll(filePaths); + } + + /** + * 创建文件监听 + */ + public void create() { + for (String filePath : PATH_LIST) { + // 这里只监听两种类型,文件修改和文件覆盖 + WatchMonitor.createAll(filePath, new DelayWatcher(new SimpleWatcher() { + + @Override + public void onModify(WatchEvent event, Path currentPath) { + logger.info("file modify,filePath={}", filePath); + FlowExecutorHolder.loadInstance().reloadRule(); + } + + @Override + public void onOverflow(WatchEvent event, Path currentPath) { + logger.info("file over flow,filePath={}", filePath); + FlowExecutorHolder.loadInstance().reloadRule(); + } + // 在监听目录或文件时,如果这个文件有修改操作,JDK会多次触发modify方法,为了解决这个问题 + // 合并 500 毫秒内相同的变化 + }, 500)).start(); + } + } + +} diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalJsonFlowELParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalJsonFlowELParser.java index c343b37c6..3386c45db 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalJsonFlowELParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalJsonFlowELParser.java @@ -1,11 +1,13 @@ package com.yomahub.liteflow.parser.el; +import com.yomahub.liteflow.monitor.MonitorFile; import com.yomahub.liteflow.spi.holder.PathContentParserHolder; import java.util.List; /** * 基于本地的json方式EL表达式解析器 + * * @author Bryan.Zhang * @since 2.8.0 */ @@ -14,6 +16,11 @@ public class LocalJsonFlowELParser extends JsonFlowELParser { @Override public void parseMain(List pathList) throws Exception { List contentList = PathContentParserHolder.loadContextAware().parseContent(pathList); + + // 添加规则文件监听 + List fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList); + MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath); + parse(contentList); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalXmlFlowELParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalXmlFlowELParser.java index d0769f164..01e3e61e6 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalXmlFlowELParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalXmlFlowELParser.java @@ -1,5 +1,6 @@ package com.yomahub.liteflow.parser.el; +import com.yomahub.liteflow.monitor.MonitorFile; import com.yomahub.liteflow.spi.holder.PathContentParserHolder; import java.util.List; @@ -13,6 +14,11 @@ public class LocalXmlFlowELParser extends XmlFlowELParser{ @Override public void parseMain(List pathList) throws Exception { List contentList = PathContentParserHolder.loadContextAware().parseContent(pathList); + + // 添加规则文件监听 + List fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList); + MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath); + parse(contentList); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalYmlFlowELParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalYmlFlowELParser.java index 8c85abef7..5cc92ef9c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalYmlFlowELParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/el/LocalYmlFlowELParser.java @@ -1,5 +1,6 @@ package com.yomahub.liteflow.parser.el; +import com.yomahub.liteflow.monitor.MonitorFile; import com.yomahub.liteflow.spi.holder.PathContentParserHolder; import java.util.List; @@ -14,6 +15,11 @@ public class LocalYmlFlowELParser extends YmlFlowELParser { @Override public void parseMain(List pathList) throws Exception { List contentList = PathContentParserHolder.loadContextAware().parseContent(pathList); + + // 添加规则文件监听 + List fileAbsolutePath = PathContentParserHolder.loadContextAware().getFileAbsolutePath(pathList); + MonitorFile.getInstance().addMonitorFilePaths(fileAbsolutePath); + parse(contentList); } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java index 072671d95..a26162917 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/property/LiteflowConfig.java @@ -94,6 +94,17 @@ public class LiteflowConfig { //替补组件class路径 private String substituteCmpClass; + // 规则文件/脚本文件变更监听 + private Boolean monitorFileEnable = Boolean.TRUE; + + public Boolean getMonitorFileEnable() { + return monitorFileEnable; + } + + public void setMonitorFileEnable(Boolean monitorFileEnable) { + this.monitorFileEnable = monitorFileEnable; + } + public Boolean getEnable() { if (ObjectUtil.isNull(enable)) { return Boolean.TRUE; diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/spi/PathContentParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/spi/PathContentParser.java index 10682a487..93f7a5117 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/spi/PathContentParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/spi/PathContentParser.java @@ -5,4 +5,6 @@ import java.util.List; public interface PathContentParser extends SpiPriority{ List parseContent(List pathList) throws Exception; + + List getFileAbsolutePath(List pathList) throws Exception; } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/spi/local/LocalPathContentParser.java b/liteflow-core/src/main/java/com/yomahub/liteflow/spi/local/LocalPathContentParser.java index af7387b9e..5dc0bf808 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/spi/local/LocalPathContentParser.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/spi/local/LocalPathContentParser.java @@ -2,6 +2,8 @@ package com.yomahub.liteflow.spi.local; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.io.FileUtil; +import cn.hutool.core.io.resource.ClassPathResource; +import cn.hutool.core.io.resource.FileResource; import cn.hutool.core.io.resource.ResourceUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.exception.ConfigErrorException; @@ -18,14 +20,14 @@ public class LocalPathContentParser implements PathContentParser { @Override public List parseContent(List pathList) throws Exception { - if(CollectionUtil.isEmpty(pathList)){ + if (CollectionUtil.isEmpty(pathList)) { throw new ConfigErrorException("rule source must not be null"); } List contentList = new ArrayList<>(); - for(String path : pathList){ - if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)){ + for (String path : pathList) { + if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) { path = FILE_URL_PREFIX + path; } else { if (!path.startsWith(CLASSPATH_URL_PREFIX)) { @@ -33,7 +35,7 @@ public class LocalPathContentParser implements PathContentParser { } } String content = ResourceUtil.readUtf8Str(path); - if (StrUtil.isNotBlank(content)){ + if (StrUtil.isNotBlank(content)) { contentList.add(content); } } @@ -41,6 +43,29 @@ public class LocalPathContentParser implements PathContentParser { return contentList; } + @Override + public List getFileAbsolutePath(List pathList) throws Exception { + if (CollectionUtil.isEmpty(pathList)) { + throw new ConfigErrorException("rule source must not be null"); + } + + List result = new ArrayList<>(); + + for (String path : pathList) { + if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) { + path = FILE_URL_PREFIX + path; + result.add(new FileResource(path).getFile().getAbsolutePath()); + } else { + if (!path.startsWith(CLASSPATH_URL_PREFIX)) { + path = CLASSPATH_URL_PREFIX + path; + result.add(new ClassPathResource(path).getAbsolutePath()); + } + } + } + + return result; + } + @Override public int priority() { return 2; diff --git a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/spi/solon/SolonPathContentParser.java b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/spi/solon/SolonPathContentParser.java index b2774d0f1..aa21f1b6c 100644 --- a/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/spi/solon/SolonPathContentParser.java +++ b/liteflow-solon-plugin/src/main/java/com/yomahub/liteflow/spi/solon/SolonPathContentParser.java @@ -10,7 +10,7 @@ import com.yomahub.liteflow.spi.PathContentParser; import org.noear.solon.Utils; import java.io.File; -import java.net.URI; +import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; @@ -20,7 +20,32 @@ import java.util.Set; public class SolonPathContentParser implements PathContentParser { @Override public List parseContent(List pathList) throws Exception { - if(CollectionUtil.isEmpty(pathList)){ + List allResource = getUrls(pathList); + + //转换成内容List + List contentList = new ArrayList<>(); + for (URL resource : allResource) { + String content = IoUtil.read(resource.openStream(), CharsetUtil.CHARSET_UTF_8); + if (StrUtil.isNotBlank(content)) { + contentList.add(content); + } + } + + return contentList; + } + + @Override + public List getFileAbsolutePath(List pathList) throws Exception { + List allResource = getUrls(pathList); + List result = new ArrayList<>(); + for (URL url : allResource) { + result.add(url.getPath()); + } + return result; + } + + private static List getUrls(List pathList) throws MalformedURLException { + if (CollectionUtil.isEmpty(pathList)) { throw new ConfigErrorException("rule source must not be null"); } @@ -44,17 +69,7 @@ public class SolonPathContentParser implements PathContentParser { if (fileTypeSet.size() != 1) { throw new ConfigErrorException("config error,please use the same type of configuration"); } - - //转换成内容List - List contentList = new ArrayList<>(); - for (URL resource : allResource) { - String content = IoUtil.read(resource.openStream(), CharsetUtil.CHARSET_UTF_8); - if (StrUtil.isNotBlank(content)){ - contentList.add(content); - } - } - - return contentList; + return allResource; } @Override diff --git a/liteflow-spring/src/main/java/com/yomahub/liteflow/spi/spring/SpringPathContentParser.java b/liteflow-spring/src/main/java/com/yomahub/liteflow/spi/spring/SpringPathContentParser.java index 791f2789e..f0f4f5157 100644 --- a/liteflow-spring/src/main/java/com/yomahub/liteflow/spi/spring/SpringPathContentParser.java +++ b/liteflow-spring/src/main/java/com/yomahub/liteflow/spi/spring/SpringPathContentParser.java @@ -1,6 +1,5 @@ package com.yomahub.liteflow.spi.spring; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.io.FileUtil; @@ -9,24 +8,49 @@ import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.exception.ConfigErrorException; -import com.yomahub.liteflow.property.LiteflowConfig; -import com.yomahub.liteflow.property.LiteflowConfigGetter; import com.yomahub.liteflow.spi.PathContentParser; import org.springframework.core.io.Resource; import org.springframework.core.io.support.PathMatchingResourcePatternResolver; import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.util.ResourceUtils; +import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; public class SpringPathContentParser implements PathContentParser { @Override public List parseContent(List pathList) throws Exception { - if(CollectionUtil.isEmpty(pathList)){ + List allResource = getResources(pathList); + + //转换成内容List + List contentList = new ArrayList<>(); + for (Resource resource : allResource) { + String content = IoUtil.read(resource.getInputStream(), CharsetUtil.CHARSET_UTF_8); + if (StrUtil.isNotBlank(content)) { + contentList.add(content); + } + } + + return contentList; + } + + @Override + public List getFileAbsolutePath(List pathList) throws Exception { + List allResource = getResources(pathList); + + //转换成内容List + List result = new ArrayList<>(); + for (Resource resource : allResource) { + result.add(resource.getFile().getAbsolutePath()); + } + return result; + } + + private List getResources(List pathList) throws IOException { + if (CollectionUtil.isEmpty(pathList)) { throw new ConfigErrorException("rule source must not be null"); } @@ -35,12 +59,12 @@ public class SpringPathContentParser implements PathContentParser { String locationPattern; //如果path是绝对路径且这个文件存在时,我们认为这是一个本地文件路径,而并非classpath路径 - if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)){ + if (FileUtil.isAbsolutePath(path) && FileUtil.isFile(path)) { locationPattern = ResourceUtils.FILE_URL_PREFIX + path; } else { if (!path.startsWith(ResourceUtils.CLASSPATH_URL_PREFIX) && !path.startsWith(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX)) { locationPattern = ResourceUtils.CLASSPATH_URL_PREFIX + path; - }else{ + } else { locationPattern = path; } } @@ -58,19 +82,10 @@ public class SpringPathContentParser implements PathContentParser { if (fileTypeSet.size() > 1) { throw new ConfigErrorException("config error,please use the same type of configuration"); } - - //转换成内容List - List contentList = new ArrayList<>(); - for (Resource resource : allResource) { - String content = IoUtil.read(resource.getInputStream(), CharsetUtil.CHARSET_UTF_8); - if (StrUtil.isNotBlank(content)){ - contentList.add(content); - } - } - - return contentList; + return allResource; } + @Override public int priority() { return 1;