diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java index abf1cfc78..9aefab85c 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/FlowBus.java @@ -32,6 +32,7 @@ import com.yomahub.liteflow.lifecycle.LifeCycleHolder; import com.yomahub.liteflow.log.LFLog; import com.yomahub.liteflow.log.LFLoggerManager; import com.yomahub.liteflow.meta.LiteflowMetaOperator; +import com.yomahub.liteflow.monitor.MonitorFile; import com.yomahub.liteflow.parser.el.LocalJsonFlowELParser; import com.yomahub.liteflow.parser.el.LocalXmlFlowELParser; import com.yomahub.liteflow.parser.el.LocalYmlFlowELParser; @@ -367,6 +368,8 @@ public class FlowBus { } public static void cleanCache() { + // 先清理文件监控系统,防止后续操作触发文件重载 + cleanMonitorFile(); chainMap.clear(); nodeMap.clear(); fallbackNodeMap.clear(); @@ -383,6 +386,19 @@ public class FlowBus { } } + /** + * 清理文件监控系统 + * 停止所有监控线程并清空状态 + */ + public static void cleanMonitorFile() { + try { + MonitorFile.getInstance().destroy(); + LOG.debug("MonitorFile cleaned successfully"); + } catch (Exception e) { + LOG.error("Error cleaning MonitorFile", e); + } + } + public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception { if (type.equals(FlowParserTypeEnum.TYPE_EL_XML)) { new LocalXmlFlowELParser().parse(content); diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java index 10bb53f0d..204df21da 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/flow/element/Chain.java @@ -50,6 +50,8 @@ public class Chain implements Executable { private String routeEl; + private String extendsChainId; + private boolean isAbstract = false; private volatile boolean isCompiled = false; @@ -291,6 +293,14 @@ public class Chain implements Executable { isAbstract = anAbstract; } + public String getExtendsChainId() { + return extendsChainId; + } + + public void setExtendsChainId(String extendsChainId) { + this.extendsChainId = extendsChainId; + } + // 构建临时的ConditionList private List buildTemporaryConditionList() { if (StrUtil.isBlank(el)) { diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java index ffb72aca0..43813f227 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/monitor/MonitorFile.java @@ -9,6 +9,7 @@ import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; import org.apache.commons.io.monitor.FileAlterationMonitor; import org.apache.commons.io.monitor.FileAlterationObserver; import java.io.File; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -25,6 +26,12 @@ public class MonitorFile { private final Set PATH_SET = new HashSet<>(); + // 保存所有监控器实例,用于后续清理 + private final List monitors = new ArrayList<>(); + + // 线程安全锁 + private final Object lock = new Object(); + public static MonitorFile getInstance() { return Singleton.get(MonitorFile.class); } @@ -55,41 +62,88 @@ public class MonitorFile { * 创建文件监听 */ public void create() throws Exception { - for (String path : PATH_SET) { - long interval = TimeUnit.MILLISECONDS.toMillis(2); - // 不使用过滤器 - FileAlterationObserver observer = new FileAlterationObserver(new File(path)); - observer.addListener(new FileAlterationListenerAdaptor() { - @Override - public void onFileChange(File file) { - LOG.info("file modify,filePath={}", file.getAbsolutePath()); - this.reloadRule(); - } + synchronized (lock) { + // 防止重复创建监控 + if (!monitors.isEmpty()) { + LOG.warn("Monitor already created, skipping..."); + return; + } - @Override - public void onFileDelete(File file) { - LOG.info("file delete,filePath={}", file.getAbsolutePath()); - this.reloadRule(); - } + for (String path : PATH_SET) { + long interval = TimeUnit.MILLISECONDS.toMillis(2); + // 不使用过滤器 + FileAlterationObserver observer = new FileAlterationObserver(new File(path)); + observer.addListener(new FileAlterationListenerAdaptor() { + @Override + public void onFileChange(File file) { + LOG.info("file modify,filePath={}", file.getAbsolutePath()); + this.reloadRule(); + } - @Override - public void onFileCreate(File file) { - LOG.info("file create,filePath={}", file.getAbsolutePath()); - this.reloadRule(); - } + @Override + public void onFileDelete(File file) { + LOG.info("file delete,filePath={}", file.getAbsolutePath()); + this.reloadRule(); + } - private void reloadRule() { - try { - FlowExecutorHolder.loadInstance().reloadRule(); - } catch (Exception e) { - LOG.error("reload rule error", e); - } + @Override + public void onFileCreate(File file) { + LOG.info("file create,filePath={}", file.getAbsolutePath()); + this.reloadRule(); + } + + private void reloadRule() { + try { + FlowExecutorHolder.loadInstance().reloadRule(); + } catch (Exception e) { + LOG.error("reload rule error", e); + } + } + }); + // 创建文件变化监听器 + FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer); + // 开始监控 + monitor.start(); + // 保存监控器引用,用于后续清理 + monitors.add(monitor); + } + } + } + + /** + * 停止所有文件监控并清理资源 + * 主要用于测试环境的清理,确保测试隔离 + */ + public void destroy() { + synchronized (lock) { + LOG.info("Destroying MonitorFile, stopping {} monitors", monitors.size()); + + // 停止所有监控线程 + for (FileAlterationMonitor monitor : monitors) { + try { + monitor.stop(1000); // 最多等待1秒 + LOG.debug("Monitor stopped successfully"); + } catch (Exception e) { + LOG.error("Error stopping monitor", e); } - }); - // 创建文件变化监听器 - FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer); - // 开始监控 - monitor.start(); + } + + // 清空监控器列表 + monitors.clear(); + // 清空路径集合 + PATH_SET.clear(); + + LOG.info("MonitorFile destroyed successfully"); + } + } + + /** + * 检查是否有活动的监控 + * @return true 如果有活动的监控, false 否则 + */ + public boolean isMonitoring() { + synchronized (lock) { + return !monitors.isEmpty(); } } diff --git a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java index c988abc13..3105bc53e 100644 --- a/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java +++ b/liteflow-core/src/main/java/com/yomahub/liteflow/parser/helper/ParserHelper.java @@ -133,10 +133,6 @@ public class ParserHelper { } public static void parseChainDocument(List documentList, Set chainIdSet) { - //用于存放抽象chain的map - Map abstratChainMap = new HashMap<>(); - //用于存放已经解析过的实现chain - Set implChainSet = new HashSet<>(); // 先在元数据里放上chain // 先放有一个好处,可以在parse的时候先映射到FlowBus的chainMap,然后再去解析 // 这样就不用去像之前的版本那样回归调用 @@ -163,11 +159,14 @@ public class ParserHelper { if (chain != null){ FlowBus.addChainPhase1(chain); } - }; + } }); // 清空 chainIdSet.clear(); + // 用于记录已经处理过的Chain + Set processedChainIds = new HashSet<>(); + // 这里才是真正的编译阶段 FlowBus.getChainMap().entrySet().forEach(entry -> { Chain chain = entry.getValue(); @@ -176,7 +175,8 @@ public class ParserHelper { return; } - // TODO:这里还要处理抽象chain的子chain的EL替换问题 + // 处理抽象chain的继承关系 + processChainInheritance(chain, processedChainIds); if (BooleanUtil.isFalse(chain.isCompiled())) { LiteFlowChainELBuilder.fromChain(chain).build(); @@ -221,10 +221,6 @@ public class ParserHelper { } public static void parseChainJson(List flowJsonObjectList, Set chainIdSet) { - //用于存放抽象chain的map - Map abstratChainMap = new HashMap<>(); - //用于存放已经解析过的实现chain - Set implChainSet = new HashSet<>(); // 先在元数据里放上chain // 先放有一个好处,可以在parse的时候先映射到FlowBus的chainMap,然后再去解析 // 这样就不用去像之前的版本那样回归调用 @@ -257,6 +253,9 @@ public class ParserHelper { // 清空 chainIdSet.clear(); + // 用于记录已经处理过的Chain + Set processedChainIds = new HashSet<>(); + // 这里才是真正的编译阶段 FlowBus.getChainMap().entrySet().forEach(entry -> { Chain chain = entry.getValue(); @@ -265,7 +264,8 @@ public class ParserHelper { return; } - // TODO:这里还要处理抽象chain的子chain的EL替换问题 + // 处理抽象chain的继承关系 + processChainInheritance(chain, processedChainIds); if (BooleanUtil.isFalse(chain.isCompiled())) { LiteFlowChainELBuilder.fromChain(chain).build(); @@ -275,7 +275,7 @@ public class ParserHelper { public static Chain parseOneChain(JsonNode chainNode) { // 先看是否可用 - String enableStr = chainNode.get(ENABLE) == null? StrUtil.EMPTY : chainNode.get(ENABLE).textValue(); + String enableStr = chainNode.get(ENABLE) == null? StrUtil.EMPTY : chainNode.get(ENABLE).asText(); if (StrUtil.isNotBlank(enableStr) && Boolean.FALSE.toString().equalsIgnoreCase(enableStr)) { return null; } @@ -290,6 +290,8 @@ public class ParserHelper { String threadPoolExecutorClass = chainNode.get(THREAD_POOL_EXECUTOR_CLASS) == null ? null : chainNode.get(THREAD_POOL_EXECUTOR_CLASS).textValue(); + String extendsChainId = chainNode.get(EXTENDS) == null ? null : chainNode.get(EXTENDS).textValue(); + LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace) .setThreadPoolExecutorClass(threadPoolExecutorClass); @@ -315,6 +317,11 @@ public class ParserHelper { builder.getChain().setAbstract(true); } + // 设置继承的父chain + if (StrUtil.isNotBlank(extendsChainId)) { + builder.getChain().setExtendsChainId(extendsChainId); + } + return builder.getChain(); } @@ -336,6 +343,8 @@ public class ParserHelper { String threadPoolExecutorClass = e.attributeValue(THREAD_POOL_EXECUTOR_CLASS) == null ? null : e.attributeValue(THREAD_POOL_EXECUTOR_CLASS); + String extendsChainId = e.attributeValue(EXTENDS); + LiteFlowChainELBuilder builder = LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace) .setThreadPoolExecutorClass(threadPoolExecutorClass); @@ -369,6 +378,11 @@ public class ParserHelper { builder.getChain().setAbstract(true); } + // 设置继承的父chain + if (StrUtil.isNotBlank(extendsChainId)) { + builder.getChain().setExtendsChainId(extendsChainId); + } + return builder.getChain(); } @@ -383,6 +397,59 @@ public class ParserHelper { } } + /** + * 处理Chain的继承关系,替换抽象Chain的占位符 + * @param chain 需要处理的Chain + * @param processedChainIds 已经处理过的Chain ID集合 + */ + private static void processChainInheritance(Chain chain, Set processedChainIds) { + if (StrUtil.isNotBlank(chain.getExtendsChainId())) { + resolveChainInheritance(chain, processedChainIds); + } + } + + /** + * 递归解析Chain的继承关系 + * @param chain 需要处理的Chain + * @param processedChainIds 已经处理过的Chain ID集合 + */ + private static void resolveChainInheritance(Chain chain, Set processedChainIds) { + // 如果已经处理过,直接返回 + if (processedChainIds.contains(chain.getChainId())) { + return; + } + + String extendsChainId = chain.getExtendsChainId(); + if (StrUtil.isBlank(extendsChainId)) { + return; + } + + // 获取父Chain + Chain parentChain = FlowBus.getChain(extendsChainId); + if (parentChain == null) { + throw new ChainNotFoundException( + StrUtil.format("[abstract chain not found] chainId={}", extendsChainId) + ); + } + + // 如果父Chain也有继承关系,先递归处理父Chain + if (StrUtil.isNotBlank(parentChain.getExtendsChainId())) { + resolveChainInheritance(parentChain, processedChainIds); + } + + // 替换当前Chain的EL表达式 + String parentEl = parentChain.getEl(); + String currentEl = chain.getEl(); + + if (StrUtil.isNotBlank(parentEl) && StrUtil.isNotBlank(currentEl)) { + String resolvedEl = ElRegexUtil.replaceAbstractChain(parentEl, currentEl); + chain.setEl(resolvedEl); + } + + // 标记为已处理 + processedChainIds.add(chain.getChainId()); + } + /** * 解析一个带继承关系的Chain,xml格式 * @param chain 实现Chain @@ -473,7 +540,7 @@ public class ParserHelper { } private static Boolean getEnableByJsonNode(JsonNode nodeObject) { - String enableStr = nodeObject.hasNonNull(ENABLE) ? nodeObject.get(ENABLE).toString() : ""; + String enableStr = nodeObject.hasNonNull(ENABLE) ? nodeObject.get(ENABLE).asText() : ""; if (StrUtil.isBlank(enableStr)) { return true; } diff --git a/liteflow-testcase-el/liteflow-testcase-el-declare-multi-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-declare-multi-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java index d64a5634f..9f34da8f8 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-declare-multi-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-declare-multi-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -12,7 +12,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { - //ComponentScanner.cleanCache(); + FlowBus.cleanMonitorFile(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); SpiFactoryInitializing.clean(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-declare-multi-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-declare-multi-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java index ba2a8f025..5009b4922 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-declare-multi-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-declare-multi-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -14,6 +14,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); ComponentScanner.cleanCache(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java index 88b1fed90..610dd6843 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-declare-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -14,6 +14,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); ComponentScanner.cleanCache(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/BaseTest.java index de9e9be5b..19462f374 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-nospring/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -13,6 +13,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); SpiFactoryInitializing.clean(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java index 5865761b6..f737bb4e8 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-solon/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -12,6 +12,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); SpiFactoryInitializing.clean(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java index 88b1fed90..610dd6843 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -14,6 +14,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); ComponentScanner.cleanCache(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap(); diff --git a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/monitorFile/MonitorFileELSpringbootTest.java b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/monitorFile/MonitorFileELSpringbootTest.java index 367a0a306..da659a703 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/monitorFile/MonitorFileELSpringbootTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springboot/src/test/java/com/yomahub/liteflow/test/monitorFile/MonitorFileELSpringbootTest.java @@ -79,9 +79,13 @@ public class MonitorFileELSpringbootTest extends BaseTest { } @AfterEach - public void afterEach(){ + public void afterEach() throws InterruptedException { String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.el.xml").getAbsolutePath(); FileUtil.writeString("THEN(a, b, c);", new File(absolutePath), CharsetUtil.CHARSET_UTF_8); + + // 等待文件监控处理完成,防止污染下一个测试 + // 监控间隔是2ms,等待100ms足够处理完成 + Thread.sleep(100); } } diff --git a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/BaseTest.java b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/BaseTest.java index 88b1fed90..610dd6843 100644 --- a/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/BaseTest.java +++ b/liteflow-testcase-el/liteflow-testcase-el-springnative/src/test/java/com/yomahub/liteflow/test/BaseTest.java @@ -14,6 +14,7 @@ public class BaseTest { @AfterAll public static void cleanScanCache() { + FlowBus.cleanMonitorFile(); ComponentScanner.cleanCache(); FlowBus.cleanCache(); ExecutorHelper.loadInstance().clearExecutorServiceMap();