diff --git a/liteflow-rule-plugin/liteflow-rule-sql/pom.xml b/liteflow-rule-plugin/liteflow-rule-sql/pom.xml
index e4933af70..f725f7cb8 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/pom.xml
+++ b/liteflow-rule-plugin/liteflow-rule-sql/pom.xml
@@ -20,5 +20,11 @@
true
provided
+
+
+ cn.hutool
+ hutool-crypto
+ ${hutool-crypto.version}
+
\ No newline at end of file
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
index ce3bbe536..6f218de4a 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
@@ -5,6 +5,7 @@ import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrFormatter;
import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.core.FlowInitHook;
import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
@@ -23,6 +24,8 @@ import java.util.Objects;
*/
public class SQLXmlELParser extends ClassXmlFlowELParser {
+ private static SQLParserVO sqlParserVO;
+
private static final String ERROR_MSG_PATTERN = "rule-source-ext-data {} is blank";
private static final String ERROR_COMMON_MSG = "rule-source-ext-data is empty";
@@ -34,7 +37,6 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
try {
- SQLParserVO sqlParserVO = null;
if (MapUtil.isNotEmpty((liteflowConfig.getRuleSourceExtDataMap()))) {
sqlParserVO = BeanUtil.toBean(liteflowConfig.getRuleSourceExtDataMap(), SQLParserVO.class,
CopyOptions.create());
@@ -63,7 +65,20 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
@Override
public String parseCustom() {
- return JDBCHelper.getInstance().getContent();
+ try{
+ JDBCHelper jdbcHelper = JDBCHelper.getInstance();
+ String content = jdbcHelper.getContent();
+ if(sqlParserVO.getIfPolling()) {
+ FlowInitHook.addHook(() -> {
+ jdbcHelper.listenSQL();
+ return true;
+ });
+ }
+ return content;
+ }
+ catch (Exception ex) {
+ throw new ELSQLException(ex.getMessage());
+ }
}
/**
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java
new file mode 100644
index 000000000..7082ee49c
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java
@@ -0,0 +1,149 @@
+package com.yomahub.liteflow.parser.sql.util;
+
+import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.log.LFLog;
+import com.yomahub.liteflow.log.LFLoggerManager;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * 用于轮询chain的定时任务
+ *
+ * @author hxinyu
+ * @since 2.11.1
+ */
+public class ChainPollingTask implements Runnable {
+
+ private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
+
+ private static final String NEW_CHAIN_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?";
+
+ private static final String SHA_PATTERN = "SHA1({})";
+
+ public static Connection conn;
+
+ private SQLParserVO sqlParserVO;
+
+ private Map chainSHAMap;
+
+ private static final Integer FETCH_SIZE_MAX = 1000;
+
+ LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class);
+
+ public ChainPollingTask(SQLParserVO sqlParserVO, Map chainSHAMap) {
+ this.sqlParserVO = sqlParserVO;
+ this.chainSHAMap = chainSHAMap;
+ conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
+ }
+
+ @Override
+ public void run() {
+ try{
+ PreparedStatement stmt;
+ ResultSet rs;
+ String chainTableName = sqlParserVO.getChainTableName();
+ String elDataField = sqlParserVO.getElDataField();
+ String chainNameField = sqlParserVO.getChainNameField();
+ String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
+ String applicationName = sqlParserVO.getApplicationName();
+
+ String SHAField = StrUtil.format(SHA_PATTERN, elDataField);
+ String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName,
+ chainApplicationNameField);
+ stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ // 设置游标拉取数量
+ stmt.setFetchSize(FETCH_SIZE_MAX);
+ stmt.setString(1, applicationName);
+ rs = stmt.executeQuery();
+
+ Set newChainSet = new HashSet<>();
+
+ while(rs.next()) {
+ String chainName = getStringFromResultSet(rs, chainNameField);
+ String newSHA = getStringFromResultSet(rs, SHAField);
+ newChainSet.add(chainName);
+ //如果封装的SHAMap中不存在该chain, 表示该chain为新增
+ if(!chainSHAMap.containsKey(chainName)) {
+ //获取新chain结果
+ ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
+ chainApplicationNameField, applicationName, chainName);
+ if(newChainRS.next()) {
+ String newELData = getStringFromResultSet(newChainRS, elDataField);
+ //新增chain
+ LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
+ LOG.info("starting reload flow config... create key={} new value={},", chainName, newELData);
+ //加入到shaMap
+ chainSHAMap.put(chainName, newSHA);
+ }
+ }
+ else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) {
+ //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
+ ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
+ chainApplicationNameField, applicationName, chainName);
+ if(newChainRS.next()) {
+ String newELData = getStringFromResultSet(newChainRS, elDataField);
+ //修改chain
+ LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
+ LOG.info("starting reload flow config... update key={} new value={},", chainName, newELData);
+ //修改shaMap
+ chainSHAMap.put(chainName, newSHA);
+ }
+ }
+ //SHA值无变化,表示该chain未改变
+ }
+
+ if(chainSHAMap.size() > newChainSet.size()) {
+ //如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况:
+ // 1、删除了chain
+ // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain
+ // 3、上述两者结合
+ //在此处遍历chainSHAMap,把不在newChainSet中的chain删除
+ for (String chainName : chainSHAMap.keySet()) {
+ if(!newChainSet.contains(chainName)){
+ FlowBus.removeChain(chainName);
+ LOG.info("starting reload flow config... delete chain={}", chainName);
+ //修改SHAMap
+ chainSHAMap.remove(chainName);
+ }
+ }
+ }
+
+ }catch (Exception e) {
+ LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
+ }
+ }
+
+ private ResultSet getNewChainRS(String elDataField, String chainTableName, String chainNameField,
+ String chainApplicationNameField, String applicationName, String chainName) {
+ ResultSet rs = null;
+ String sqlCmd = StrUtil.format(NEW_CHAIN_PATTERN, elDataField, chainTableName,
+ chainNameField, chainApplicationNameField);
+ try{
+ PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ stmt.setString(1, chainName);
+ stmt.setString(2, applicationName);
+ rs = stmt.executeQuery();
+ }catch (Exception e) {
+ throw new ELSQLException(e.getMessage());
+ }
+ return rs;
+ }
+
+ private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
+ String data = rs.getString(field);
+ if (StrUtil.isBlank(data)) {
+ throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
+ }
+ return data;
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
index 58d374b6e..250242b8f 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
@@ -1,8 +1,11 @@
package com.yomahub.liteflow.parser.sql.util;
import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.thread.NamedThreadFactory;
+import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.XmlUtil;
+import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.enums.ScriptTypeEnum;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
@@ -12,9 +15,11 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* jdbc 工具类
@@ -48,6 +53,15 @@ public class JDBCHelper {
private static JDBCHelper INSTANCE;
+ //定时任务线程池核心线程数
+ private static final int CORE_POOL_SIZE = 2;
+
+ //定时任务线程池
+ private static ScheduledThreadPoolExecutor pollExecutor;
+
+ //chain的SHA1加密值 用于轮询时确定elDataField是否变化
+ private Map chainSHAMap = new HashMap<>();
+
/**
* 初始化 INSTANCE
*/
@@ -58,6 +72,15 @@ public class JDBCHelper {
Class.forName(sqlParserVO.getDriverClassName());
}
INSTANCE.setSqlParserVO(sqlParserVO);
+ //创建定时任务线程池
+ if (sqlParserVO.getIfPolling() && ObjectUtil.isNull(getPollExecutor())) {
+ ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false);
+ ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
+ CORE_POOL_SIZE,
+ namedThreadFactory,
+ new ThreadPoolExecutor.DiscardOldestPolicy());
+ setPollExecutor(threadPoolExecutor);
+ }
} catch (ClassNotFoundException e) {
throw new ELSQLException(e.getMessage());
}
@@ -109,6 +132,12 @@ public class JDBCHelper {
String chainName = getStringFromResultSet(rs, chainNameField);
result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
+
+ //如果需要轮询 计算该chainData的SHA值
+ if(sqlParserVO.getIfPolling()){
+ String chainSHA = DigestUtil.sha1Hex(elData);
+ chainSHAMap.put(chainName, chainSHA);
+ }
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
@@ -129,6 +158,16 @@ public class JDBCHelper {
return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
}
+ /**
+ * 定时轮询拉取SQL中变化的数据
+ */
+ public void listenSQL() {
+ ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap);
+ pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(),
+ sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
+ }
+
+
/**
* 获取脚本节点内容
*/
@@ -299,4 +338,11 @@ public class JDBCHelper {
this.sqlParserVO = sqlParserVO;
}
+ public static ScheduledThreadPoolExecutor getPollExecutor() {
+ return pollExecutor;
+ }
+
+ public static void setPollExecutor(ScheduledThreadPoolExecutor pollExecutor) {
+ JDBCHelper.pollExecutor = pollExecutor;
+ }
}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
index b549bafdc..4246b3e2d 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
@@ -90,6 +90,15 @@ public class SQLParserVO {
*/
private String scriptLanguageField;
+ /*是否开启轮询机制 默认不开启*/
+ private Boolean ifPolling = false;
+
+ /*轮询时间间隔(s) 默认60s*/
+ private Integer pollingInterval = 60;
+
+ /*规则配置后首次轮询的起始时间 默认为60s*/
+ private Integer pollingStartTime = 60;
+
public String getUrl() {
return url;
}
@@ -225,4 +234,27 @@ public class SQLParserVO {
return StrUtil.isBlank(url) && StrUtil.isBlank(username) && StrUtil.isBlank(password) && StrUtil.isBlank(driverClassName);
}
+ public Boolean getIfPolling() {
+ return ifPolling;
+ }
+
+ public void setIfPolling(Boolean ifPolling) {
+ this.ifPolling = ifPolling;
+ }
+
+ public Integer getPollingInterval() {
+ return pollingInterval;
+ }
+
+ public void setPollingInterval(Integer pollingInterval) {
+ this.pollingInterval = pollingInterval;
+ }
+
+ public Integer getPollingStartTime() {
+ return pollingStartTime;
+ }
+
+ public void setPollingStartTime(Integer pollingStartTime) {
+ this.pollingStartTime = pollingStartTime;
+ }
}