mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-10 03:07:32 +08:00
bug #ID7OTO bind对象为chain时,chain的定义顺序影响了bind数据的获取
重写了整个底层的parser逻辑
This commit is contained in:
@@ -32,6 +32,7 @@ import com.yomahub.liteflow.lifecycle.LifeCycleHolder;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.meta.LiteflowMetaOperator;
|
||||
import com.yomahub.liteflow.monitor.MonitorFile;
|
||||
import com.yomahub.liteflow.parser.el.LocalJsonFlowELParser;
|
||||
import com.yomahub.liteflow.parser.el.LocalXmlFlowELParser;
|
||||
import com.yomahub.liteflow.parser.el.LocalYmlFlowELParser;
|
||||
@@ -367,6 +368,8 @@ public class FlowBus {
|
||||
}
|
||||
|
||||
public static void cleanCache() {
|
||||
// 先清理文件监控系统,防止后续操作触发文件重载
|
||||
cleanMonitorFile();
|
||||
chainMap.clear();
|
||||
nodeMap.clear();
|
||||
fallbackNodeMap.clear();
|
||||
@@ -383,6 +386,19 @@ public class FlowBus {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理文件监控系统
|
||||
* 停止所有监控线程并清空状态
|
||||
*/
|
||||
public static void cleanMonitorFile() {
|
||||
try {
|
||||
MonitorFile.getInstance().destroy();
|
||||
LOG.debug("MonitorFile cleaned successfully");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error cleaning MonitorFile", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void refreshFlowMetaData(FlowParserTypeEnum type, String content) throws Exception {
|
||||
if (type.equals(FlowParserTypeEnum.TYPE_EL_XML)) {
|
||||
new LocalXmlFlowELParser().parse(content);
|
||||
|
||||
@@ -50,6 +50,8 @@ public class Chain implements Executable {
|
||||
|
||||
private String routeEl;
|
||||
|
||||
private String extendsChainId;
|
||||
|
||||
private boolean isAbstract = false;
|
||||
|
||||
private volatile boolean isCompiled = false;
|
||||
@@ -291,6 +293,14 @@ public class Chain implements Executable {
|
||||
isAbstract = anAbstract;
|
||||
}
|
||||
|
||||
public String getExtendsChainId() {
|
||||
return extendsChainId;
|
||||
}
|
||||
|
||||
public void setExtendsChainId(String extendsChainId) {
|
||||
this.extendsChainId = extendsChainId;
|
||||
}
|
||||
|
||||
// 构建临时的ConditionList
|
||||
private List<Condition> buildTemporaryConditionList() {
|
||||
if (StrUtil.isBlank(el)) {
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.apache.commons.io.monitor.FileAlterationListenerAdaptor;
|
||||
import org.apache.commons.io.monitor.FileAlterationMonitor;
|
||||
import org.apache.commons.io.monitor.FileAlterationObserver;
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@@ -25,6 +26,12 @@ public class MonitorFile {
|
||||
|
||||
private final Set<String> PATH_SET = new HashSet<>();
|
||||
|
||||
// 保存所有监控器实例,用于后续清理
|
||||
private final List<FileAlterationMonitor> monitors = new ArrayList<>();
|
||||
|
||||
// 线程安全锁
|
||||
private final Object lock = new Object();
|
||||
|
||||
public static MonitorFile getInstance() {
|
||||
return Singleton.get(MonitorFile.class);
|
||||
}
|
||||
@@ -55,41 +62,88 @@ public class MonitorFile {
|
||||
* 创建文件监听
|
||||
*/
|
||||
public void create() throws Exception {
|
||||
for (String path : PATH_SET) {
|
||||
long interval = TimeUnit.MILLISECONDS.toMillis(2);
|
||||
// 不使用过滤器
|
||||
FileAlterationObserver observer = new FileAlterationObserver(new File(path));
|
||||
observer.addListener(new FileAlterationListenerAdaptor() {
|
||||
@Override
|
||||
public void onFileChange(File file) {
|
||||
LOG.info("file modify,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
synchronized (lock) {
|
||||
// 防止重复创建监控
|
||||
if (!monitors.isEmpty()) {
|
||||
LOG.warn("Monitor already created, skipping...");
|
||||
return;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFileDelete(File file) {
|
||||
LOG.info("file delete,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
for (String path : PATH_SET) {
|
||||
long interval = TimeUnit.MILLISECONDS.toMillis(2);
|
||||
// 不使用过滤器
|
||||
FileAlterationObserver observer = new FileAlterationObserver(new File(path));
|
||||
observer.addListener(new FileAlterationListenerAdaptor() {
|
||||
@Override
|
||||
public void onFileChange(File file) {
|
||||
LOG.info("file modify,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFileCreate(File file) {
|
||||
LOG.info("file create,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
@Override
|
||||
public void onFileDelete(File file) {
|
||||
LOG.info("file delete,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
|
||||
private void reloadRule() {
|
||||
try {
|
||||
FlowExecutorHolder.loadInstance().reloadRule();
|
||||
} catch (Exception e) {
|
||||
LOG.error("reload rule error", e);
|
||||
}
|
||||
@Override
|
||||
public void onFileCreate(File file) {
|
||||
LOG.info("file create,filePath={}", file.getAbsolutePath());
|
||||
this.reloadRule();
|
||||
}
|
||||
|
||||
private void reloadRule() {
|
||||
try {
|
||||
FlowExecutorHolder.loadInstance().reloadRule();
|
||||
} catch (Exception e) {
|
||||
LOG.error("reload rule error", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
// 创建文件变化监听器
|
||||
FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
|
||||
// 开始监控
|
||||
monitor.start();
|
||||
// 保存监控器引用,用于后续清理
|
||||
monitors.add(monitor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止所有文件监控并清理资源
|
||||
* 主要用于测试环境的清理,确保测试隔离
|
||||
*/
|
||||
public void destroy() {
|
||||
synchronized (lock) {
|
||||
LOG.info("Destroying MonitorFile, stopping {} monitors", monitors.size());
|
||||
|
||||
// 停止所有监控线程
|
||||
for (FileAlterationMonitor monitor : monitors) {
|
||||
try {
|
||||
monitor.stop(1000); // 最多等待1秒
|
||||
LOG.debug("Monitor stopped successfully");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error stopping monitor", e);
|
||||
}
|
||||
});
|
||||
// 创建文件变化监听器
|
||||
FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
|
||||
// 开始监控
|
||||
monitor.start();
|
||||
}
|
||||
|
||||
// 清空监控器列表
|
||||
monitors.clear();
|
||||
// 清空路径集合
|
||||
PATH_SET.clear();
|
||||
|
||||
LOG.info("MonitorFile destroyed successfully");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否有活动的监控
|
||||
* @return true 如果有活动的监控, false 否则
|
||||
*/
|
||||
public boolean isMonitoring() {
|
||||
synchronized (lock) {
|
||||
return !monitors.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -133,10 +133,6 @@ public class ParserHelper {
|
||||
}
|
||||
|
||||
public static void parseChainDocument(List<Document> documentList, Set<String> chainIdSet) {
|
||||
//用于存放抽象chain的map
|
||||
Map<String,Element> abstratChainMap = new HashMap<>();
|
||||
//用于存放已经解析过的实现chain
|
||||
Set<Element> implChainSet = new HashSet<>();
|
||||
// 先在元数据里放上chain
|
||||
// 先放有一个好处,可以在parse的时候先映射到FlowBus的chainMap,然后再去解析
|
||||
// 这样就不用去像之前的版本那样回归调用
|
||||
@@ -163,11 +159,14 @@ public class ParserHelper {
|
||||
if (chain != null){
|
||||
FlowBus.addChainPhase1(chain);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
// 清空
|
||||
chainIdSet.clear();
|
||||
|
||||
// 用于记录已经处理过的Chain
|
||||
Set<String> processedChainIds = new HashSet<>();
|
||||
|
||||
// 这里才是真正的编译阶段
|
||||
FlowBus.getChainMap().entrySet().forEach(entry -> {
|
||||
Chain chain = entry.getValue();
|
||||
@@ -176,7 +175,8 @@ public class ParserHelper {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO:这里还要处理抽象chain的子chain的EL替换问题
|
||||
// 处理抽象chain的继承关系
|
||||
processChainInheritance(chain, processedChainIds);
|
||||
|
||||
if (BooleanUtil.isFalse(chain.isCompiled())) {
|
||||
LiteFlowChainELBuilder.fromChain(chain).build();
|
||||
@@ -221,10 +221,6 @@ public class ParserHelper {
|
||||
}
|
||||
|
||||
public static void parseChainJson(List<JsonNode> flowJsonObjectList, Set<String> chainIdSet) {
|
||||
//用于存放抽象chain的map
|
||||
Map<String,JsonNode> abstratChainMap = new HashMap<>();
|
||||
//用于存放已经解析过的实现chain
|
||||
Set<JsonNode> implChainSet = new HashSet<>();
|
||||
// 先在元数据里放上chain
|
||||
// 先放有一个好处,可以在parse的时候先映射到FlowBus的chainMap,然后再去解析
|
||||
// 这样就不用去像之前的版本那样回归调用
|
||||
@@ -257,6 +253,9 @@ public class ParserHelper {
|
||||
// 清空
|
||||
chainIdSet.clear();
|
||||
|
||||
// 用于记录已经处理过的Chain
|
||||
Set<String> processedChainIds = new HashSet<>();
|
||||
|
||||
// 这里才是真正的编译阶段
|
||||
FlowBus.getChainMap().entrySet().forEach(entry -> {
|
||||
Chain chain = entry.getValue();
|
||||
@@ -265,7 +264,8 @@ public class ParserHelper {
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO:这里还要处理抽象chain的子chain的EL替换问题
|
||||
// 处理抽象chain的继承关系
|
||||
processChainInheritance(chain, processedChainIds);
|
||||
|
||||
if (BooleanUtil.isFalse(chain.isCompiled())) {
|
||||
LiteFlowChainELBuilder.fromChain(chain).build();
|
||||
@@ -275,7 +275,7 @@ public class ParserHelper {
|
||||
|
||||
public static Chain parseOneChain(JsonNode chainNode) {
|
||||
// 先看是否可用
|
||||
String enableStr = chainNode.get(ENABLE) == null? StrUtil.EMPTY : chainNode.get(ENABLE).textValue();
|
||||
String enableStr = chainNode.get(ENABLE) == null? StrUtil.EMPTY : chainNode.get(ENABLE).asText();
|
||||
if (StrUtil.isNotBlank(enableStr) && Boolean.FALSE.toString().equalsIgnoreCase(enableStr)) {
|
||||
return null;
|
||||
}
|
||||
@@ -290,6 +290,8 @@ public class ParserHelper {
|
||||
String threadPoolExecutorClass = chainNode.get(THREAD_POOL_EXECUTOR_CLASS) == null ? null :
|
||||
chainNode.get(THREAD_POOL_EXECUTOR_CLASS).textValue();
|
||||
|
||||
String extendsChainId = chainNode.get(EXTENDS) == null ? null : chainNode.get(EXTENDS).textValue();
|
||||
|
||||
LiteFlowChainELBuilder builder =
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace)
|
||||
.setThreadPoolExecutorClass(threadPoolExecutorClass);
|
||||
@@ -315,6 +317,11 @@ public class ParserHelper {
|
||||
builder.getChain().setAbstract(true);
|
||||
}
|
||||
|
||||
// 设置继承的父chain
|
||||
if (StrUtil.isNotBlank(extendsChainId)) {
|
||||
builder.getChain().setExtendsChainId(extendsChainId);
|
||||
}
|
||||
|
||||
return builder.getChain();
|
||||
}
|
||||
|
||||
@@ -336,6 +343,8 @@ public class ParserHelper {
|
||||
String threadPoolExecutorClass = e.attributeValue(THREAD_POOL_EXECUTOR_CLASS) == null ? null :
|
||||
e.attributeValue(THREAD_POOL_EXECUTOR_CLASS);
|
||||
|
||||
String extendsChainId = e.attributeValue(EXTENDS);
|
||||
|
||||
LiteFlowChainELBuilder builder =
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainId).setNamespace(namespace)
|
||||
.setThreadPoolExecutorClass(threadPoolExecutorClass);
|
||||
@@ -369,6 +378,11 @@ public class ParserHelper {
|
||||
builder.getChain().setAbstract(true);
|
||||
}
|
||||
|
||||
// 设置继承的父chain
|
||||
if (StrUtil.isNotBlank(extendsChainId)) {
|
||||
builder.getChain().setExtendsChainId(extendsChainId);
|
||||
}
|
||||
|
||||
return builder.getChain();
|
||||
}
|
||||
|
||||
@@ -383,6 +397,59 @@ public class ParserHelper {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理Chain的继承关系,替换抽象Chain的占位符
|
||||
* @param chain 需要处理的Chain
|
||||
* @param processedChainIds 已经处理过的Chain ID集合
|
||||
*/
|
||||
private static void processChainInheritance(Chain chain, Set<String> processedChainIds) {
|
||||
if (StrUtil.isNotBlank(chain.getExtendsChainId())) {
|
||||
resolveChainInheritance(chain, processedChainIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 递归解析Chain的继承关系
|
||||
* @param chain 需要处理的Chain
|
||||
* @param processedChainIds 已经处理过的Chain ID集合
|
||||
*/
|
||||
private static void resolveChainInheritance(Chain chain, Set<String> processedChainIds) {
|
||||
// 如果已经处理过,直接返回
|
||||
if (processedChainIds.contains(chain.getChainId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
String extendsChainId = chain.getExtendsChainId();
|
||||
if (StrUtil.isBlank(extendsChainId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取父Chain
|
||||
Chain parentChain = FlowBus.getChain(extendsChainId);
|
||||
if (parentChain == null) {
|
||||
throw new ChainNotFoundException(
|
||||
StrUtil.format("[abstract chain not found] chainId={}", extendsChainId)
|
||||
);
|
||||
}
|
||||
|
||||
// 如果父Chain也有继承关系,先递归处理父Chain
|
||||
if (StrUtil.isNotBlank(parentChain.getExtendsChainId())) {
|
||||
resolveChainInheritance(parentChain, processedChainIds);
|
||||
}
|
||||
|
||||
// 替换当前Chain的EL表达式
|
||||
String parentEl = parentChain.getEl();
|
||||
String currentEl = chain.getEl();
|
||||
|
||||
if (StrUtil.isNotBlank(parentEl) && StrUtil.isNotBlank(currentEl)) {
|
||||
String resolvedEl = ElRegexUtil.replaceAbstractChain(parentEl, currentEl);
|
||||
chain.setEl(resolvedEl);
|
||||
}
|
||||
|
||||
// 标记为已处理
|
||||
processedChainIds.add(chain.getChainId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析一个带继承关系的Chain,xml格式
|
||||
* @param chain 实现Chain
|
||||
@@ -473,7 +540,7 @@ public class ParserHelper {
|
||||
}
|
||||
|
||||
private static Boolean getEnableByJsonNode(JsonNode nodeObject) {
|
||||
String enableStr = nodeObject.hasNonNull(ENABLE) ? nodeObject.get(ENABLE).toString() : "";
|
||||
String enableStr = nodeObject.hasNonNull(ENABLE) ? nodeObject.get(ENABLE).asText() : "";
|
||||
if (StrUtil.isBlank(enableStr)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
//ComponentScanner.cleanCache();
|
||||
FlowBus.cleanMonitorFile();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
SpiFactoryInitializing.clean();
|
||||
|
||||
@@ -14,6 +14,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
ComponentScanner.cleanCache();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
|
||||
@@ -14,6 +14,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
ComponentScanner.cleanCache();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
|
||||
@@ -13,6 +13,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
SpiFactoryInitializing.clean();
|
||||
|
||||
@@ -12,6 +12,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
SpiFactoryInitializing.clean();
|
||||
|
||||
@@ -14,6 +14,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
ComponentScanner.cleanCache();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
|
||||
@@ -79,9 +79,13 @@ public class MonitorFileELSpringbootTest extends BaseTest {
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void afterEach(){
|
||||
public void afterEach() throws InterruptedException {
|
||||
String absolutePath = new ClassPathResource("classpath:/monitorFile/flow.el.xml").getAbsolutePath();
|
||||
FileUtil.writeString("<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow><chain name=\"chain1\">THEN(a, b, c);</chain></flow>", new File(absolutePath), CharsetUtil.CHARSET_UTF_8);
|
||||
|
||||
// 等待文件监控处理完成,防止污染下一个测试
|
||||
// 监控间隔是2ms,等待100ms足够处理完成
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ public class BaseTest {
|
||||
|
||||
@AfterAll
|
||||
public static void cleanScanCache() {
|
||||
FlowBus.cleanMonitorFile();
|
||||
ComponentScanner.cleanCache();
|
||||
FlowBus.cleanCache();
|
||||
ExecutorHelper.loadInstance().clearExecutorServiceMap();
|
||||
|
||||
Reference in New Issue
Block a user