From 91dc95684e8365416847484fdb256df90919d04e Mon Sep 17 00:00:00 2001
From: gaibu <1016771049@qq.com>
Date: Thu, 28 Sep 2023 13:30:28 +0800
Subject: [PATCH] =?UTF-8?q?sql=20=E6=8F=92=E4=BB=B6=E9=87=8D=E6=9E=84?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../liteflow/parser/constant/ReadType.java | 14 +
.../parser/constant/SqlReadConstant.java | 32 ++
.../liteflow/parser/sql/SQLXmlELParser.java | 4 +
.../sql/polling/AbstractSqlReadPollTask.java | 98 ++++++
.../parser/sql/polling/SqlReadPollTask.java | 18 +
.../sql/polling/impl/ChainReadPollTask.java | 56 +++
.../sql/polling/impl/ScriptReadPollTask.java | 50 +++
.../parser/sql/read/AbstractSqlRead.java | 98 ++++++
.../liteflow/parser/sql/read/SqlRead.java | 20 ++
.../parser/sql/read/SqlReadFactory.java | 48 +++
.../parser/sql/read/impl/ChainRead.java | 68 ++++
.../parser/sql/read/impl/ScriptRead.java | 151 ++++++++
.../parser/sql/util/ChainPollingTask.java | 126 -------
.../liteflow/parser/sql/util/JDBCHelper.java | 333 +++---------------
.../parser/sql/util/ScriptPollingTask.java | 145 --------
.../liteflow/parser/sql/vo/SQLParserVO.java | 25 +-
16 files changed, 730 insertions(+), 556 deletions(-)
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java
create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java
delete mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java
delete mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java
new file mode 100644
index 000000000..a687d53d1
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java
@@ -0,0 +1,14 @@
+package com.yomahub.liteflow.parser.constant;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:43
+ */
+public enum ReadType {
+ CHAIN,
+ SCRIPT;
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java
new file mode 100644
index 000000000..127bc0084
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java
@@ -0,0 +1,32 @@
+package com.yomahub.liteflow.parser.constant;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:42
+ */
+public class SqlReadConstant {
+
+ public static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
+
+ public static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} ";
+
+ public static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
+
+ public static final String SCRIPT_WITH_LANGUAGE_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
+
+ public static final String CHAIN_XML_PATTERN = "";
+
+ public static final String NODE_XML_PATTERN = "{}";
+
+ public static final String NODE_ITEM_XML_PATTERN = "";
+
+ public static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "";
+
+ public static final String XML_PATTERN = "{}{}";
+
+ public static final Integer FETCH_SIZE_MAX = 1000;
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
index 98fa7f640..13301dbd5 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java
@@ -8,6 +8,7 @@ import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.core.FlowInitHook;
import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.SqlReadFactory;
import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
import com.yomahub.liteflow.property.LiteflowConfig;
@@ -53,6 +54,9 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
// 初始化 JDBCHelper
JDBCHelper.init(sqlParserVO);
+
+ // 初始化 SqlReadFactory
+ SqlReadFactory.registerRead(sqlParserVO);
}
catch (ELSQLException elsqlException) {
throw elsqlException;
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java
new file mode 100644
index 000000000..1fc75902d
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java
@@ -0,0 +1,98 @@
+package com.yomahub.liteflow.parser.sql.polling;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.map.MapUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.digest.DigestUtil;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.SqlRead;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 14:12
+ */
+public abstract class AbstractSqlReadPollTask implements SqlReadPollTask {
+ private final Map DATA_SHA_MAP;
+ private final SqlRead read;
+
+ public AbstractSqlReadPollTask(Map dataMap, SqlRead read) {
+ this.read = read;
+ this.DATA_SHA_MAP = shaMapValue(dataMap);
+
+ if (!read.type().equals(type())) {
+ throw new ELSQLException("SqlReadPollTask type not match");
+ }
+ }
+
+ @Override
+ public void execute() {
+ Map newData = read.read();
+ // 新增或者更新的元素
+ Map saveElementMap = new HashMap();
+ // 删除的元素
+ List deleteElementIds = new ArrayList<>();
+
+ for (Map.Entry entry : newData.entrySet()) {
+ String id = entry.getKey();
+ String element = entry.getValue();
+ String newSHA = DigestUtil.sha1Hex(element);
+
+ // 新增
+ // 如果封装的SHAMap中不存在该chain, 表示该元素为新增
+ if (!DATA_SHA_MAP.containsKey(id)) {
+ saveElementMap.put(id, element);
+
+ DATA_SHA_MAP.put(id, newSHA);
+ }
+ // 修改
+ // SHA值发生变化,表示该元素的值已被修改,重新拉取变化的chain
+ else if (!StrUtil.equals(newSHA, DATA_SHA_MAP.get(id))) {
+ saveElementMap.put(id, element);
+
+ DATA_SHA_MAP.put(id, newSHA);
+ }
+ }
+
+ Set oldIdList = DATA_SHA_MAP.keySet(); // 旧的 id 列表
+ Set newIdList = newData.keySet(); // 新的 id 列表
+ // 计算单差集
+ // 计算集合的单差集,即只返回【newIdList】中有,但是【oldIdList】中没有的元素,例如:
+ // subtractToList([1,2,3,4],[2,3,4,5]) -》 [1]
+ deleteElementIds = CollUtil.subtractToList(newIdList, oldIdList);
+
+ for (String id : deleteElementIds) {
+ DATA_SHA_MAP.remove(id);
+ }
+
+ if (CollUtil.isNotEmpty(saveElementMap)) {
+ doSave(saveElementMap);
+ }
+
+ if (CollUtil.isNotEmpty(deleteElementIds)) {
+ doDelete(deleteElementIds);
+ }
+ }
+
+ public abstract void doSave(Map saveElementMap);
+
+ public abstract void doDelete(List deleteElementId);
+
+ private Map shaMapValue(Map dataMap) {
+ Map result = new HashMap<>();
+ dataMap.forEach((k, v) -> {
+ result.put(k, DigestUtil.sha1Hex(v));
+ });
+
+ return result;
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java
new file mode 100644
index 000000000..898dbfb21
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java
@@ -0,0 +1,18 @@
+package com.yomahub.liteflow.parser.sql.polling;
+
+import com.yomahub.liteflow.parser.constant.ReadType;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 14:10
+ */
+public interface SqlReadPollTask {
+
+ void execute();
+
+ ReadType type();
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java
new file mode 100644
index 000000000..1eaac11c4
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java
@@ -0,0 +1,56 @@
+package com.yomahub.liteflow.parser.sql.polling.impl;
+
+import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.parser.constant.ReadType;
+import com.yomahub.liteflow.parser.helper.ParserHelper;
+import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask;
+import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask;
+import com.yomahub.liteflow.parser.sql.read.SqlRead;
+import org.dom4j.Document;
+import org.dom4j.DocumentHelper;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.yomahub.liteflow.common.ChainConstant.ID;
+import static com.yomahub.liteflow.common.ChainConstant.NAME;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 14:46
+ */
+public class ChainReadPollTask extends AbstractSqlReadPollTask {
+
+ public ChainReadPollTask(Map dataMap, SqlRead read) {
+ super(dataMap, read);
+ }
+
+ @Override
+ public void doSave(Map saveElementMap) {
+ for (Map.Entry entry : saveElementMap.entrySet()) {
+ String chainName = entry.getKey();
+ String newData = entry.getValue();
+
+ LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
+ }
+ }
+
+ @Override
+ public void doDelete(List deleteElementId) {
+ for (String id : deleteElementId) {
+ FlowBus.removeChain(id);
+ }
+ }
+
+ @Override
+ public ReadType type() {
+ return ReadType.CHAIN;
+ }
+
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java
new file mode 100644
index 000000000..41dd5b4c4
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java
@@ -0,0 +1,50 @@
+package com.yomahub.liteflow.parser.sql.polling.impl;
+
+import com.yomahub.liteflow.flow.FlowBus;
+import com.yomahub.liteflow.parser.constant.ReadType;
+import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
+import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask;
+import com.yomahub.liteflow.parser.sql.read.SqlRead;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 15:03
+ */
+public class ScriptReadPollTask extends AbstractSqlReadPollTask {
+ public ScriptReadPollTask(Map dataMap, SqlRead read) {
+ super(dataMap, read);
+ }
+
+ @Override
+ public void doSave(Map saveElementMap) {
+ for (Map.Entry entry : saveElementMap.entrySet()) {
+ String scriptKey = entry.getKey();
+ String newData = entry.getValue();
+
+ NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
+ NodeConvertHelper.changeScriptNode(scriptVO, newData);
+ }
+ }
+
+ @Override
+ public void doDelete(List deleteElementId) {
+ for (String id : deleteElementId) {
+ NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id);
+
+ // 删除script
+ FlowBus.getNodeMap().remove(scriptVO.getNodeId());
+ }
+ }
+
+ @Override
+ public ReadType type() {
+ return ReadType.SCRIPT;
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java
new file mode 100644
index 000000000..e917a6161
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java
@@ -0,0 +1,98 @@
+package com.yomahub.liteflow.parser.sql.read;
+
+import cn.hutool.core.util.StrUtil;
+import com.yomahub.liteflow.log.LFLog;
+import com.yomahub.liteflow.log.LFLoggerManager;
+import com.yomahub.liteflow.parser.constant.SqlReadConstant;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead;
+import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:26
+ */
+public abstract class AbstractSqlRead implements SqlRead {
+ public final SQLParserVO config;
+ private static LFLog LOG = LFLoggerManager.getLogger(AbstractSqlRead.class);
+
+ public AbstractSqlRead(SQLParserVO config) {
+ this.config = config;
+ }
+
+ @Override
+ public Map read() {
+ // 如果不需要读取直接返回
+ if (!needRead()) {
+ return new HashMap<>();
+ }
+
+ Map result = new HashMap<>();
+ String sqlCmd = buildQuerySql();
+ if (config.getSqlLogEnabled()) {
+ LOG.info("query sql:{}", sqlCmd.replace("?", "'" + config.getApplicationName() + "'"));
+ }
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
+ try {
+ conn = LiteFlowJdbcUtil.getConn(config);
+ stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ // 设置游标拉取数量
+ stmt.setFetchSize(SqlReadConstant.FETCH_SIZE_MAX);
+ stmt.setString(1, config.getApplicationName());
+ rs = stmt.executeQuery();
+
+ while (rs.next()) {
+ String xml = buildXmlElement(rs);
+ String uniqueKey = buildXmlElementUniqueKey(rs);
+
+ result.put(uniqueKey, xml);
+ }
+ } catch (Exception e) {
+ throw new ELSQLException(e.getMessage());
+ } finally {
+ // 关闭连接
+ LiteFlowJdbcUtil.close(conn, stmt, rs);
+ }
+
+ return result;
+ }
+
+ public abstract String buildQuerySql();
+
+ public abstract String buildXmlElement(ResultSet rs) throws SQLException;
+
+ public abstract String buildXmlElementUniqueKey(ResultSet rs) throws SQLException;
+
+ /**
+ * 是否可以读取
+ * chain 默认可以读取
+ * script 需要判断是否有配置
+ */
+ public boolean needRead() {
+ return true;
+ }
+
+
+ public String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
+ String data = rs.getString(field);
+ if (StrUtil.isBlank(data)) {
+ throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
+ }
+ return data;
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java
new file mode 100644
index 000000000..c94c54570
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java
@@ -0,0 +1,20 @@
+package com.yomahub.liteflow.parser.sql.read;
+
+import com.yomahub.liteflow.parser.constant.ReadType;
+
+import java.util.Map;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:25
+ */
+public interface SqlRead {
+
+ Map read();
+
+ ReadType type();
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java
new file mode 100644
index 000000000..4f4f531e4
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java
@@ -0,0 +1,48 @@
+package com.yomahub.liteflow.parser.sql.read;
+
+import com.yomahub.liteflow.parser.constant.ReadType;
+import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask;
+import com.yomahub.liteflow.parser.sql.polling.impl.ChainReadPollTask;
+import com.yomahub.liteflow.parser.sql.polling.impl.ScriptReadPollTask;
+import com.yomahub.liteflow.parser.sql.read.impl.ChainRead;
+import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 15:42
+ */
+public class SqlReadFactory {
+ private static final Map READ_MAP = new HashMap<>();
+ private static final Map POLL_TASK_MAP = new HashMap<>();
+
+ public static void registerRead(SQLParserVO config) {
+ READ_MAP.put(ReadType.CHAIN, new ChainRead(config));
+ READ_MAP.put(ReadType.SCRIPT, new ScriptRead(config));
+ }
+
+ public static void registerSqlReadPollTask(ReadType readType, Map dataMap) {
+ SqlRead sqlRead = getSqlRead(readType);
+ if (ReadType.CHAIN.equals(readType)) {
+ POLL_TASK_MAP.put(ReadType.CHAIN, new ChainReadPollTask(dataMap, sqlRead));
+ } else if (ReadType.SCRIPT.equals(readType)) {
+ POLL_TASK_MAP.put(ReadType.SCRIPT, new ScriptReadPollTask(dataMap, sqlRead));
+ }
+
+ }
+
+ public static SqlRead getSqlRead(ReadType readType) {
+ return READ_MAP.get(readType);
+ }
+
+ public static SqlReadPollTask getSqlReadPollTask(ReadType readType) {
+ return POLL_TASK_MAP.get(readType);
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java
new file mode 100644
index 000000000..a8c776b3e
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java
@@ -0,0 +1,68 @@
+package com.yomahub.liteflow.parser.sql.read.impl;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.util.XmlUtil;
+import com.yomahub.liteflow.parser.constant.ReadType;
+import com.yomahub.liteflow.parser.constant.SqlReadConstant;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:44
+ */
+public class ChainRead extends AbstractSqlRead {
+
+ public ChainRead(SQLParserVO config) {
+ super(config);
+ }
+
+ @Override
+ public String buildQuerySql() {
+ String chainTableName = super.config.getChainTableName();
+ String elDataField = super.config.getElDataField();
+ String chainNameField = super.config.getChainNameField();
+ String chainApplicationNameField = super.config.getChainApplicationNameField();
+ String applicationName = super.config.getApplicationName();
+
+ if (StrUtil.isBlank(chainTableName)) {
+ throw new ELSQLException("You did not define the chainTableName property");
+ }
+
+ if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
+ throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
+ }
+
+ String sqlCmd = StrUtil.format(SqlReadConstant.SQL_PATTERN, chainNameField, elDataField, chainTableName,
+ chainApplicationNameField);
+
+ return sqlCmd;
+ }
+
+ @Override
+ public String buildXmlElement(ResultSet rs) throws SQLException {
+ String elDataField = super.config.getElDataField();
+
+ return getStringFromResultSet(rs, elDataField);
+ }
+
+ @Override
+ public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException {
+ String chainNameField = super.config.getChainNameField();
+
+ return getStringFromResultSet(rs, chainNameField);
+ }
+
+ @Override
+ public ReadType type() {
+ return ReadType.CHAIN;
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java
new file mode 100644
index 000000000..8c04d5086
--- /dev/null
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java
@@ -0,0 +1,151 @@
+package com.yomahub.liteflow.parser.sql.read.impl;
+
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.util.XmlUtil;
+import com.yomahub.liteflow.enums.NodeTypeEnum;
+import com.yomahub.liteflow.enums.ScriptTypeEnum;
+import com.yomahub.liteflow.log.LFLog;
+import com.yomahub.liteflow.log.LFLoggerManager;
+import com.yomahub.liteflow.parser.constant.ReadType;
+import com.yomahub.liteflow.parser.constant.SqlReadConstant;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead;
+import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Copyright (C), 2021, 北京同创永益科技发展有限公司
+ *
+ * @author tangkc
+ * @version 3.0.0
+ * @description
+ * @date 2023/9/28 11:49
+ */
+public class ScriptRead extends AbstractSqlRead {
+
+ public ScriptRead(SQLParserVO config) {
+ super(config);
+ }
+
+ @Override
+ public String buildQuerySql() {
+ String scriptLanguageField = super.config.getScriptLanguageField();
+ String scriptTableName = super.config.getScriptTableName();
+ String scriptIdField = super.config.getScriptIdField();
+ String scriptDataField = super.config.getScriptDataField();
+ String scriptNameField = super.config.getScriptNameField();
+ String scriptTypeField = super.config.getScriptTypeField();
+ String scriptApplicationNameField = super.config.getScriptApplicationNameField();
+ String applicationName = super.config.getApplicationName();
+
+ if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
+ throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
+ }
+
+ String sqlCmd = null;
+ // 脚本节点(带语言)
+ if (withLanguage()) {
+ sqlCmd = StrUtil.format(
+ SqlReadConstant.SCRIPT_WITH_LANGUAGE_SQL_PATTERN,
+ scriptIdField,
+ scriptDataField,
+ scriptNameField,
+ scriptTypeField,
+ scriptLanguageField,
+ scriptTableName,
+ scriptApplicationNameField
+ );
+
+ return sqlCmd;
+ }
+ // 脚本节点(不带语言)
+ else {
+ sqlCmd = StrUtil.format(
+ SqlReadConstant.SCRIPT_SQL_PATTERN,
+ scriptIdField,
+ scriptDataField,
+ scriptNameField,
+ scriptTypeField,
+ scriptTableName,
+ scriptApplicationNameField
+ );
+ }
+
+
+ return sqlCmd;
+ }
+
+ @Override
+ public String buildXmlElement(ResultSet rs) throws SQLException {
+ String scriptDataField = super.config.getScriptDataField();
+
+ return getStringFromResultSet(rs, scriptDataField);
+
+ }
+
+ @Override
+ public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException {
+ String scriptIdField = super.config.getScriptIdField();
+ String scriptNameField = super.config.getScriptNameField();
+ String scriptTypeField = super.config.getScriptTypeField();
+ String scriptLanguageField = super.config.getScriptLanguageField();
+
+ String id = getStringFromResultSet(rs, scriptIdField);
+ String name = getStringFromResultSet(rs, scriptNameField);
+ String type = getStringFromResultSet(rs, scriptTypeField);
+ String language = withLanguage() ? getStringFromResultSet(rs, scriptLanguageField) : null;
+
+ NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
+ if (Objects.isNull(nodeTypeEnum)) {
+ throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
+ }
+
+ if (!nodeTypeEnum.isScript()) {
+ throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
+ }
+
+ if (withLanguage() && !ScriptTypeEnum.checkScriptType(language)) {
+ throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language));
+ }
+ List keys = CollUtil.newArrayList(id, type, name);
+ if (StrUtil.isNotBlank(language)) {
+ keys.add(language);
+ }
+
+ return StrUtil.join(StrUtil.COLON, keys);
+ }
+
+ @Override
+ public boolean needRead() {
+ if (StrUtil.isBlank(super.config.getScriptTableName())) {
+ return false;
+ }
+
+ String sqlCmd = StrUtil.format(
+ SqlReadConstant.SCRIPT_SQL_CHECK_PATTERN,
+ super.config.getScriptTableName()
+ );
+
+ Connection conn = LiteFlowJdbcUtil.getConn(super.config);
+ return LiteFlowJdbcUtil.checkConnectionCanExecuteSql(conn, sqlCmd);
+ }
+
+ @Override
+ public ReadType type() {
+ return ReadType.SCRIPT;
+ }
+
+ /**
+ * 脚本是否带语言
+ */
+ private boolean withLanguage() {
+ return StrUtil.isNotBlank(super.config.getScriptLanguageField());
+ }
+}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java
deleted file mode 100644
index ac6b9be84..000000000
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java
+++ /dev/null
@@ -1,126 +0,0 @@
-package com.yomahub.liteflow.parser.sql.util;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.crypto.digest.DigestUtil;
-import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
-import com.yomahub.liteflow.flow.FlowBus;
-import com.yomahub.liteflow.log.LFLog;
-import com.yomahub.liteflow.log.LFLoggerManager;
-import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
-import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * 用于轮询chain的定时任务
- *
- * @author hxinyu
- * @since 2.11.1
- */
-public class ChainPollingTask implements Runnable {
-
- private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
-
- private Connection conn;
-
- private SQLParserVO sqlParserVO;
-
- private Map chainSHAMap;
-
- private static final Integer FETCH_SIZE_MAX = 1000;
-
- LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class);
-
- public ChainPollingTask(SQLParserVO sqlParserVO, Map chainSHAMap) {
- this.sqlParserVO = sqlParserVO;
- this.chainSHAMap = chainSHAMap;
- }
-
- @Override
- public void run() {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- PreparedStatement stmt = null;
- ResultSet rs = null;
- try{
- String chainTableName = sqlParserVO.getChainTableName();
- String elDataField = sqlParserVO.getElDataField();
- String chainNameField = sqlParserVO.getChainNameField();
- String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
- String applicationName = sqlParserVO.getApplicationName();
-
- String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
- chainApplicationNameField);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 设置游标拉取数量
- stmt.setFetchSize(FETCH_SIZE_MAX);
- stmt.setString(1, applicationName);
- rs = stmt.executeQuery();
-
- Set newChainSet = new HashSet<>();
-
- while(rs.next()) {
- String chainName = getStringFromResultSet(rs, chainNameField);
- String newData = getStringFromResultSet(rs, elDataField);
- String newSHA = DigestUtil.sha1Hex(newData);
- newChainSet.add(chainName);
- //如果封装的SHAMap中不存在该chain, 表示该chain为新增
- if(!chainSHAMap.containsKey(chainName)) {
- //新增chain
- LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
- LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newData);
- //加入到shaMap
- chainSHAMap.put(chainName, newSHA);
-
- }
- else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) {
- //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
- //修改chain
- LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
- LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newData);
- //修改shaMap
- chainSHAMap.put(chainName, newSHA);
- }
- //SHA值无变化,表示该chain未改变
- }
-
- if(chainSHAMap.size() > newChainSet.size()) {
- //如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况:
- // 1、删除了chain
- // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain
- // 3、上述两者结合
- //在此处遍历chainSHAMap,把不在newChainSet中的chain删除
- //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
- Iterator iterator = chainSHAMap.keySet().iterator();
- while(iterator.hasNext()) {
- String chainName = iterator.next();
- if(!newChainSet.contains(chainName)) {
- FlowBus.removeChain(chainName);
- LOG.info("starting reload flow config... delete chain={}", chainName);
- //修改SHAMap
- iterator.remove();
- }
- }
- }
- } catch (Exception e) {
- LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
- }
-
- private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
- String data = rs.getString(field);
- if (StrUtil.isBlank(data)) {
- throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
- }
- return data;
- }
-}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
index 8eb6dc4b8..aae5e44f7 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java
@@ -5,22 +5,21 @@ import cn.hutool.core.thread.NamedThreadFactory;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.core.util.XmlUtil;
-import cn.hutool.crypto.digest.DigestUtil;
-import com.yomahub.liteflow.enums.NodeTypeEnum;
-import com.yomahub.liteflow.enums.ScriptTypeEnum;
-import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
-import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+import com.yomahub.liteflow.parser.constant.ReadType;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
+import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
+import com.yomahub.liteflow.parser.sql.read.SqlRead;
+import com.yomahub.liteflow.parser.sql.read.SqlReadFactory;
+import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
+import org.apache.commons.lang.StringUtils;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import static com.yomahub.liteflow.parser.constant.SqlReadConstant.*;
/**
* jdbc 工具类
*
@@ -29,26 +28,6 @@ import java.util.concurrent.TimeUnit;
*/
public class JDBCHelper {
- private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
-
- private static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} WHERE {}=?";
-
- private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
-
- private static final String SCRIPT_WITH_LANGUAG_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
-
- private static final String CHAIN_XML_PATTERN = "";
-
- private static final String NODE_XML_PATTERN = "{}";
-
- private static final String NODE_ITEM_XML_PATTERN = "";
-
- private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "";
-
- private static final String XML_PATTERN = "{}{}";
-
- private static final Integer FETCH_SIZE_MAX = 1000;
-
private SQLParserVO sqlParserVO;
private static JDBCHelper INSTANCE;
@@ -59,12 +38,6 @@ public class JDBCHelper {
//定时任务线程池
private static ScheduledThreadPoolExecutor pollExecutor;
- //chain的SHA1加密值 用于轮询时确定chain是否变化
- private Map chainSHAMap = new HashMap<>();
-
- //script的SHA1加密值 用于轮询时确定script是否变化
- private Map scriptSHAMap = new HashMap<>();
-
/**
* 初始化 INSTANCE
*/
@@ -75,13 +48,10 @@ public class JDBCHelper {
Class.forName(sqlParserVO.getDriverClassName());
}
INSTANCE.setSqlParserVO(sqlParserVO);
- //创建定时任务线程池
+ // 创建定时任务线程池
if (sqlParserVO.getPollingEnabled() && ObjectUtil.isNull(getPollExecutor())) {
ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false);
- ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
- CORE_POOL_SIZE,
- namedThreadFactory,
- new ThreadPoolExecutor.DiscardOldestPolicy());
+ ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE, namedThreadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
setPollExecutor(threadPoolExecutor);
}
} catch (ClassNotFoundException e) {
@@ -100,64 +70,38 @@ public class JDBCHelper {
* 获取 ElData 数据内容
*/
public String getContent() {
- Connection conn = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
+ SqlRead chainRead = SqlReadFactory.getSqlRead(ReadType.CHAIN);
+ SqlRead scriptRead = SqlReadFactory.getSqlRead(ReadType.SCRIPT);
- String chainTableName = sqlParserVO.getChainTableName();
- String elDataField = sqlParserVO.getElDataField();
- String chainNameField = sqlParserVO.getChainNameField();
- String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
- String applicationName = sqlParserVO.getApplicationName();
+ // 获取 chain 数据
+ Map chainMap = chainRead.read();
+ List chainList = new ArrayList<>();
+ chainMap.forEach((chainName, elData) -> {
+ chainList.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
+ });
+ String chainsContent = CollUtil.join(chainList, StrUtil.EMPTY);
- if (StrUtil.isBlank(chainTableName)) {
- throw new ELSQLException("You did not define the chainTableName property");
- }
+ // 获取脚本数据
+ Map scriptMap = scriptRead.read();
+ List scriptList = new ArrayList<>();
+ scriptMap.forEach((scriptKey, elData) -> {
+ NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
+ String id = scriptVO.getNodeId();
+ String name = scriptVO.getName();
+ String type = scriptVO.getType();
+ String language = scriptVO.getLanguage();
- if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
- throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
- }
-
- String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
- chainApplicationNameField);
-
- List result = new ArrayList<>();
- try {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 设置游标拉取数量
- stmt.setFetchSize(FETCH_SIZE_MAX);
- stmt.setString(1, applicationName);
- rs = stmt.executeQuery();
-
- while (rs.next()) {
- String elData = getStringFromResultSet(rs, elDataField);
- String chainName = getStringFromResultSet(rs, chainNameField);
-
- result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
-
- //如果需要轮询 计算该chainData的SHA值
- if(sqlParserVO.getPollingEnabled()){
- String chainSHA = DigestUtil.sha1Hex(elData);
- chainSHAMap.put(chainName, chainSHA);
- }
+ if (StringUtils.isNotBlank(scriptVO.getLanguage())) {
+ scriptList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),type, language, elData));
+ } else {
+ scriptList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, elData));
}
- } catch (Exception e) {
- throw new ELSQLException(e.getMessage());
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
-
- String chainsContent = CollUtil.join(result, StrUtil.EMPTY);
-
- String nodesContent;
- if (hasScriptData()) {
- nodesContent = getScriptNodes();
- } else {
- nodesContent = StrUtil.EMPTY;
- }
+ });
+ String nodesContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptList, StrUtil.EMPTY));
+ // 注册
+ SqlReadFactory.registerSqlReadPollTask(ReadType.CHAIN, chainMap);
+ SqlReadFactory.registerSqlReadPollTask(ReadType.SCRIPT, scriptMap);
return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
}
@@ -165,197 +109,22 @@ public class JDBCHelper {
* 定时轮询拉取SQL中变化的数据
*/
public void listenSQL() {
- //添加轮询chain的定时任务
- ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap);
- pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartSeconds().longValue(),
- sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS);
- if (hasScriptData()) {
- //添加轮询script的定时任务
- ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap);
- pollExecutor.scheduleAtFixedRate(scriptTask, sqlParserVO.getPollingStartSeconds().longValue(),
- sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS);
- }
- }
+ // 添加轮询chain的定时任务
+ pollExecutor.scheduleAtFixedRate(
+ () -> SqlReadFactory.getSqlReadPollTask(ReadType.CHAIN).execute(),
+ sqlParserVO.getPollingStartSeconds().longValue(),
+ sqlParserVO.getPollingIntervalSeconds().longValue(),
+ TimeUnit.SECONDS
+ );
+ // 添加轮询script的定时任务
+ pollExecutor.scheduleAtFixedRate(
+ () -> SqlReadFactory.getSqlReadPollTask(ReadType.SCRIPT).execute(),
+ sqlParserVO.getPollingStartSeconds().longValue(),
+ sqlParserVO.getPollingIntervalSeconds().longValue(),
+ TimeUnit.SECONDS
+ );
- /**
- * 获取脚本节点内容
- */
- public String getScriptNodes() {
- String scriptLanguageField = sqlParserVO.getScriptLanguageField();
- if (StrUtil.isNotBlank(scriptLanguageField)) {
- return getScriptNodesWithLanguage();
- }
-
- List result = new ArrayList<>();
- Connection conn = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
-
- String scriptTableName = sqlParserVO.getScriptTableName();
- String scriptIdField = sqlParserVO.getScriptIdField();
- String scriptDataField = sqlParserVO.getScriptDataField();
- String scriptNameField = sqlParserVO.getScriptNameField();
- String scriptTypeField = sqlParserVO.getScriptTypeField();
- String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
- String applicationName = sqlParserVO.getApplicationName();
-
- if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
- throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
- }
-
- String sqlCmd = StrUtil.format(SCRIPT_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
- scriptTypeField, scriptTableName, scriptApplicationNameField);
- try {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 设置游标拉取数量
- stmt.setFetchSize(FETCH_SIZE_MAX);
- stmt.setString(1, applicationName);
- rs = stmt.executeQuery();
-
- while (rs.next()) {
- String id = getStringFromResultSet(rs, scriptIdField);
- String data = getStringFromResultSet(rs, scriptDataField);
- String name = getStringFromResultSet(rs, scriptNameField);
- String type = getStringFromResultSet(rs, scriptTypeField);
-
- NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
- if (Objects.isNull(nodeTypeEnum)) {
- throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
- }
-
- if (!nodeTypeEnum.isScript()) {
- throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
- }
-
- result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data));
-
- //如果需要轮询 计算该scriptData的SHA值
- if(sqlParserVO.getPollingEnabled()){
- String scriptKey = StrUtil.join(":", id, type, name);
- String scriptSHA = DigestUtil.sha1Hex(data);
- scriptSHAMap.put(scriptKey, scriptSHA);
- }
- }
- } catch (Exception e) {
- throw new ELSQLException(e.getMessage());
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
- return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
- }
-
- /**
- * 获取脚本节点(带语言)
- *
- * @return
- */
- public String getScriptNodesWithLanguage() {
-
- List result = new ArrayList<>();
- Connection conn = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
-
- String scriptTableName = sqlParserVO.getScriptTableName();
- String scriptIdField = sqlParserVO.getScriptIdField();
- String scriptDataField = sqlParserVO.getScriptDataField();
- String scriptNameField = sqlParserVO.getScriptNameField();
- String scriptTypeField = sqlParserVO.getScriptTypeField();
- String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
- String applicationName = sqlParserVO.getApplicationName();
- String scriptLanguageField = sqlParserVO.getScriptLanguageField();
-
- if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
- throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
- }
-
- String sqlCmd = StrUtil.format(SCRIPT_WITH_LANGUAG_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
- scriptTypeField, scriptLanguageField, scriptTableName, scriptApplicationNameField);
- try {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 设置游标拉取数量
- stmt.setFetchSize(FETCH_SIZE_MAX);
- stmt.setString(1, applicationName);
- rs = stmt.executeQuery();
-
- while (rs.next()) {
- String id = getStringFromResultSet(rs, scriptIdField);
- String data = getStringFromResultSet(rs, scriptDataField);
- String name = getStringFromResultSet(rs, scriptNameField);
- String type = getStringFromResultSet(rs, scriptTypeField);
- String language = getStringFromResultSet(rs, scriptLanguageField);
-
- NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
- if (Objects.isNull(nodeTypeEnum)) {
- throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
- }
-
- if (!nodeTypeEnum.isScript()) {
- throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
- }
-
- if (!ScriptTypeEnum.checkScriptType(language)) {
- throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language));
- }
-
- result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),
- type, language, data));
-
- //如果需要轮询 计算该scriptData的SHA值
- if(sqlParserVO.getPollingEnabled()){
- String scriptKey = StrUtil.join(":", id, type, name, language);
- String scriptSHA = DigestUtil.sha1Hex(data);
- scriptSHAMap.put(scriptKey, scriptSHA);
- }
- }
- } catch (Exception e) {
- throw new ELSQLException(e.getMessage());
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
- return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
- }
-
- private boolean hasScriptData() {
- if (StrUtil.isBlank(sqlParserVO.getScriptTableName())) {
- return false;
- }
-
- Connection conn = null;
- PreparedStatement stmt = null;
- ResultSet rs = null;
- String sqlCmd = StrUtil.format(SCRIPT_SQL_CHECK_PATTERN, sqlParserVO.getScriptTableName(),
- sqlParserVO.getScriptApplicationNameField());
- try {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- stmt.setFetchSize(1);
- stmt.setString(1, sqlParserVO.getApplicationName());
- rs = stmt.executeQuery();
- return rs.next();
- } catch (Exception e) {
- return false;
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
- }
-
- private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
- String data = rs.getString(field);
- if (StrUtil.isBlank(data)) {
- throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
- }
- return data;
- }
-
- private SQLParserVO getSqlParserVO() {
- return sqlParserVO;
}
private void setSqlParserVO(SQLParserVO sqlParserVO) {
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java
deleted file mode 100644
index 5a68e6f7f..000000000
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java
+++ /dev/null
@@ -1,145 +0,0 @@
-package com.yomahub.liteflow.parser.sql.util;
-
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.crypto.digest.DigestUtil;
-import com.yomahub.liteflow.flow.FlowBus;
-import com.yomahub.liteflow.log.LFLog;
-import com.yomahub.liteflow.log.LFLoggerManager;
-import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
-import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
-import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
-
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.*;
-
-/**
- * 用于轮询script的定时任务
- *
- * @author hxinyu
- * @since 2.11.1
- */
-public class ScriptPollingTask implements Runnable {
-
- private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
-
- private static final String CONCAT_PATTERN = "CONCAT_WS(':',{},{},{}) as script_concat";
-
- private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat";
-
- private static final String SCRIPT_KEY_FIELD = "script_concat";
-
- private Connection conn;
-
- private SQLParserVO sqlParserVO;
-
- private Map scriptSHAMap;
-
- private static final Integer FETCH_SIZE_MAX = 1000;
-
- LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class);
-
- public ScriptPollingTask(SQLParserVO sqlParserVO, Map scriptSHAMap) {
- this.sqlParserVO = sqlParserVO;
- this.scriptSHAMap = scriptSHAMap;
- }
-
-
- @Override
- public void run() {
- conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
- PreparedStatement stmt = null;
- ResultSet rs = null;
- try {
- String scriptTableName = sqlParserVO.getScriptTableName();
- String scriptIdField = sqlParserVO.getScriptIdField();
- String scriptDataField = sqlParserVO.getScriptDataField();
- String scriptNameField = sqlParserVO.getScriptNameField();
- String scriptTypeField = sqlParserVO.getScriptTypeField();
- String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
- String applicationName = sqlParserVO.getApplicationName();
- String scriptLanguageField = sqlParserVO.getScriptLanguageField();
-
- String KeyField;
- if (StrUtil.isNotBlank(scriptLanguageField)) {
- KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptTypeField, scriptNameField, scriptLanguageField);
- } else {
- KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptTypeField, scriptNameField);
- }
-
- String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, scriptDataField, scriptTableName, scriptApplicationNameField);
- stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
- // 设置游标拉取数量
- stmt.setFetchSize(FETCH_SIZE_MAX);
- stmt.setString(1, applicationName);
- rs = stmt.executeQuery();
-
- Set newScriptSet = new HashSet<>();
-
- while (rs.next()) {
- String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD);
- String newData = getStringFromResultSet(rs, scriptDataField);
- String newSHA = DigestUtil.sha1Hex(newData);
- newScriptSet.add(scriptKey);
- //如果封装的SHAMap中不存在该script 表示该script为新增
- if (!scriptSHAMap.containsKey(scriptKey)) {
- NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
- //新增script
- NodeConvertHelper.changeScriptNode(scriptVO, newData);
- LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newData);
-
- //加入到shaMap
- scriptSHAMap.put(scriptKey, newSHA);
- }
- else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) {
- //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
- NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
- //修改script
- NodeConvertHelper.changeScriptNode(scriptVO, newData);
- LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newData);
-
- //修改shaMap
- scriptSHAMap.put(scriptKey, newSHA);
- }
- //SHA值无变化,表示该chain未改变
- }
-
- if(scriptSHAMap.size() > newScriptSet.size()) {
- //如果遍历prepareStatement后修改过的SHAMap数量比最新script总数多, 说明有两种情况:
- // 1、删除了script
- // 2、修改了script的id/name/type:因为遍历到新的script_key时会加到SHAMap里,但没有机会删除旧的script
- // 3、上述两者结合
- //在此处遍历scriptSHAMap,把不在newScriptSet中的script删除
- //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
- Iterator iterator = scriptSHAMap.keySet().iterator();
- while(iterator.hasNext()){
- String scriptKey = iterator.next();
- if (!newScriptSet.contains(scriptKey)) {
- NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
- //删除script
- FlowBus.getNodeMap().remove(scriptVO.getNodeId());
- LOG.info("starting reload flow config... delete script={}", scriptKey);
- //修改SHAMap
- iterator.remove();
- }
- }
- }
-
- } catch (Exception e) {
- LOG.error("[Exception during SQL script polling] " + e.getMessage(), e);
- } finally {
- // 关闭连接
- LiteFlowJdbcUtil.close(conn, stmt, rs);
- }
- }
-
- private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
- String data = rs.getString(field);
- if (StrUtil.isBlank(data)) {
- throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
- }
- return data;
- }
-}
diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
index b15d68b72..8824328dd 100644
--- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
+++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java
@@ -90,15 +90,26 @@ public class SQLParserVO {
*/
private String scriptLanguageField;
- /*轮询机制是否开启 默认不开启*/
+ /**
+ * 轮询机制是否开启 默认不开启
+ */
private Boolean pollingEnabled = false;
- /*轮询时间间隔(s) 默认120s*/
+ /**
+ * 轮询时间间隔(s) 默认60s
+ */
private Integer pollingIntervalSeconds = 60;
- /*规则配置后首次轮询的起始时间 默认为60s*/
+ /**
+ * 规则配置后首次轮询的起始时间 默认为60s
+ */
private Integer pollingStartSeconds = 60;
+ /**
+ * 是否开启sql日志
+ */
+ private Boolean sqlLogEnabled = true;
+
public String getUrl() {
return url;
}
@@ -257,4 +268,12 @@ public class SQLParserVO {
public void setPollingStartSeconds(Integer pollingStartSeconds) {
this.pollingStartSeconds = pollingStartSeconds;
}
+
+ public Boolean getSqlLogEnabled() {
+ return sqlLogEnabled;
+ }
+
+ public void setSqlLogEnabled(Boolean sqlLogEnabled) {
+ this.sqlLogEnabled = sqlLogEnabled;
+ }
}