diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java new file mode 100644 index 000000000..a687d53d1 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/ReadType.java @@ -0,0 +1,14 @@ +package com.yomahub.liteflow.parser.constant; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:43 + */ +public enum ReadType { + CHAIN, + SCRIPT; +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java new file mode 100644 index 000000000..127bc0084 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/constant/SqlReadConstant.java @@ -0,0 +1,32 @@ +package com.yomahub.liteflow.parser.constant; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:42 + */ +public class SqlReadConstant { + + public static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; + + public static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} "; + + public static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?"; + + public static final String SCRIPT_WITH_LANGUAGE_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?"; + + public static final String CHAIN_XML_PATTERN = ""; + + public static final String NODE_XML_PATTERN = "{}"; + + public static final String NODE_ITEM_XML_PATTERN = ""; + + public static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = ""; + + public static final String XML_PATTERN = "{}{}"; + + public static final Integer FETCH_SIZE_MAX = 1000; +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java index 98fa7f640..13301dbd5 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java @@ -8,6 +8,7 @@ import cn.hutool.core.util.StrUtil; import com.yomahub.liteflow.core.FlowInitHook; import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser; import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.SqlReadFactory; import com.yomahub.liteflow.parser.sql.util.JDBCHelper; import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; import com.yomahub.liteflow.property.LiteflowConfig; @@ -53,6 +54,9 @@ public class SQLXmlELParser extends ClassXmlFlowELParser { // 初始化 JDBCHelper JDBCHelper.init(sqlParserVO); + + // 初始化 SqlReadFactory + SqlReadFactory.registerRead(sqlParserVO); } catch (ELSQLException elsqlException) { throw elsqlException; diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java new file mode 100644 index 000000000..1fc75902d --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/AbstractSqlReadPollTask.java @@ -0,0 +1,98 @@ +package com.yomahub.liteflow.parser.sql.polling; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.digest.DigestUtil; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.SqlRead; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 14:12 + */ +public abstract class AbstractSqlReadPollTask implements SqlReadPollTask { + private final Map DATA_SHA_MAP; + private final SqlRead read; + + public AbstractSqlReadPollTask(Map dataMap, SqlRead read) { + this.read = read; + this.DATA_SHA_MAP = shaMapValue(dataMap); + + if (!read.type().equals(type())) { + throw new ELSQLException("SqlReadPollTask type not match"); + } + } + + @Override + public void execute() { + Map newData = read.read(); + // 新增或者更新的元素 + Map saveElementMap = new HashMap(); + // 删除的元素 + List deleteElementIds = new ArrayList<>(); + + for (Map.Entry entry : newData.entrySet()) { + String id = entry.getKey(); + String element = entry.getValue(); + String newSHA = DigestUtil.sha1Hex(element); + + // 新增 + // 如果封装的SHAMap中不存在该chain, 表示该元素为新增 + if (!DATA_SHA_MAP.containsKey(id)) { + saveElementMap.put(id, element); + + DATA_SHA_MAP.put(id, newSHA); + } + // 修改 + // SHA值发生变化,表示该元素的值已被修改,重新拉取变化的chain + else if (!StrUtil.equals(newSHA, DATA_SHA_MAP.get(id))) { + saveElementMap.put(id, element); + + DATA_SHA_MAP.put(id, newSHA); + } + } + + Set oldIdList = DATA_SHA_MAP.keySet(); // 旧的 id 列表 + Set newIdList = newData.keySet(); // 新的 id 列表 + // 计算单差集 + // 计算集合的单差集,即只返回【newIdList】中有,但是【oldIdList】中没有的元素,例如: + // subtractToList([1,2,3,4],[2,3,4,5]) -》 [1] + deleteElementIds = CollUtil.subtractToList(newIdList, oldIdList); + + for (String id : deleteElementIds) { + DATA_SHA_MAP.remove(id); + } + + if (CollUtil.isNotEmpty(saveElementMap)) { + doSave(saveElementMap); + } + + if (CollUtil.isNotEmpty(deleteElementIds)) { + doDelete(deleteElementIds); + } + } + + public abstract void doSave(Map saveElementMap); + + public abstract void doDelete(List deleteElementId); + + private Map shaMapValue(Map dataMap) { + Map result = new HashMap<>(); + dataMap.forEach((k, v) -> { + result.put(k, DigestUtil.sha1Hex(v)); + }); + + return result; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java new file mode 100644 index 000000000..898dbfb21 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/SqlReadPollTask.java @@ -0,0 +1,18 @@ +package com.yomahub.liteflow.parser.sql.polling; + +import com.yomahub.liteflow.parser.constant.ReadType; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 14:10 + */ +public interface SqlReadPollTask { + + void execute(); + + ReadType type(); +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java new file mode 100644 index 000000000..1eaac11c4 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ChainReadPollTask.java @@ -0,0 +1,56 @@ +package com.yomahub.liteflow.parser.sql.polling.impl; + +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.parser.constant.ReadType; +import com.yomahub.liteflow.parser.helper.ParserHelper; +import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask; +import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask; +import com.yomahub.liteflow.parser.sql.read.SqlRead; +import org.dom4j.Document; +import org.dom4j.DocumentHelper; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.yomahub.liteflow.common.ChainConstant.ID; +import static com.yomahub.liteflow.common.ChainConstant.NAME; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 14:46 + */ +public class ChainReadPollTask extends AbstractSqlReadPollTask { + + public ChainReadPollTask(Map dataMap, SqlRead read) { + super(dataMap, read); + } + + @Override + public void doSave(Map saveElementMap) { + for (Map.Entry entry : saveElementMap.entrySet()) { + String chainName = entry.getKey(); + String newData = entry.getValue(); + + LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build(); + } + } + + @Override + public void doDelete(List deleteElementId) { + for (String id : deleteElementId) { + FlowBus.removeChain(id); + } + } + + @Override + public ReadType type() { + return ReadType.CHAIN; + } + +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java new file mode 100644 index 000000000..41dd5b4c4 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/polling/impl/ScriptReadPollTask.java @@ -0,0 +1,50 @@ +package com.yomahub.liteflow.parser.sql.polling.impl; + +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.parser.constant.ReadType; +import com.yomahub.liteflow.parser.helper.NodeConvertHelper; +import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask; +import com.yomahub.liteflow.parser.sql.read.SqlRead; + +import java.util.List; +import java.util.Map; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 15:03 + */ +public class ScriptReadPollTask extends AbstractSqlReadPollTask { + public ScriptReadPollTask(Map dataMap, SqlRead read) { + super(dataMap, read); + } + + @Override + public void doSave(Map saveElementMap) { + for (Map.Entry entry : saveElementMap.entrySet()) { + String scriptKey = entry.getKey(); + String newData = entry.getValue(); + + NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey); + NodeConvertHelper.changeScriptNode(scriptVO, newData); + } + } + + @Override + public void doDelete(List deleteElementId) { + for (String id : deleteElementId) { + NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id); + + // 删除script + FlowBus.getNodeMap().remove(scriptVO.getNodeId()); + } + } + + @Override + public ReadType type() { + return ReadType.SCRIPT; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java new file mode 100644 index 000000000..e917a6161 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/AbstractSqlRead.java @@ -0,0 +1,98 @@ +package com.yomahub.liteflow.parser.sql.read; + +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.constant.SqlReadConstant; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead; +import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:26 + */ +public abstract class AbstractSqlRead implements SqlRead { + public final SQLParserVO config; + private static LFLog LOG = LFLoggerManager.getLogger(AbstractSqlRead.class); + + public AbstractSqlRead(SQLParserVO config) { + this.config = config; + } + + @Override + public Map read() { + // 如果不需要读取直接返回 + if (!needRead()) { + return new HashMap<>(); + } + + Map result = new HashMap<>(); + String sqlCmd = buildQuerySql(); + if (config.getSqlLogEnabled()) { + LOG.info("query sql:{}", sqlCmd.replace("?", "'" + config.getApplicationName() + "'")); + } + + Connection conn = null; + PreparedStatement stmt = null; + ResultSet rs = null; + try { + conn = LiteFlowJdbcUtil.getConn(config); + stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + // 设置游标拉取数量 + stmt.setFetchSize(SqlReadConstant.FETCH_SIZE_MAX); + stmt.setString(1, config.getApplicationName()); + rs = stmt.executeQuery(); + + while (rs.next()) { + String xml = buildXmlElement(rs); + String uniqueKey = buildXmlElementUniqueKey(rs); + + result.put(uniqueKey, xml); + } + } catch (Exception e) { + throw new ELSQLException(e.getMessage()); + } finally { + // 关闭连接 + LiteFlowJdbcUtil.close(conn, stmt, rs); + } + + return result; + } + + public abstract String buildQuerySql(); + + public abstract String buildXmlElement(ResultSet rs) throws SQLException; + + public abstract String buildXmlElementUniqueKey(ResultSet rs) throws SQLException; + + /** + * 是否可以读取 + * chain 默认可以读取 + * script 需要判断是否有配置 + */ + public boolean needRead() { + return true; + } + + + public String getStringFromResultSet(ResultSet rs, String field) throws SQLException { + String data = rs.getString(field); + if (StrUtil.isBlank(data)) { + throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); + } + return data; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java new file mode 100644 index 000000000..c94c54570 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlRead.java @@ -0,0 +1,20 @@ +package com.yomahub.liteflow.parser.sql.read; + +import com.yomahub.liteflow.parser.constant.ReadType; + +import java.util.Map; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:25 + */ +public interface SqlRead { + + Map read(); + + ReadType type(); +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java new file mode 100644 index 000000000..4f4f531e4 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/SqlReadFactory.java @@ -0,0 +1,48 @@ +package com.yomahub.liteflow.parser.sql.read; + +import com.yomahub.liteflow.parser.constant.ReadType; +import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask; +import com.yomahub.liteflow.parser.sql.polling.impl.ChainReadPollTask; +import com.yomahub.liteflow.parser.sql.polling.impl.ScriptReadPollTask; +import com.yomahub.liteflow.parser.sql.read.impl.ChainRead; +import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.util.HashMap; +import java.util.Map; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 15:42 + */ +public class SqlReadFactory { + private static final Map READ_MAP = new HashMap<>(); + private static final Map POLL_TASK_MAP = new HashMap<>(); + + public static void registerRead(SQLParserVO config) { + READ_MAP.put(ReadType.CHAIN, new ChainRead(config)); + READ_MAP.put(ReadType.SCRIPT, new ScriptRead(config)); + } + + public static void registerSqlReadPollTask(ReadType readType, Map dataMap) { + SqlRead sqlRead = getSqlRead(readType); + if (ReadType.CHAIN.equals(readType)) { + POLL_TASK_MAP.put(ReadType.CHAIN, new ChainReadPollTask(dataMap, sqlRead)); + } else if (ReadType.SCRIPT.equals(readType)) { + POLL_TASK_MAP.put(ReadType.SCRIPT, new ScriptReadPollTask(dataMap, sqlRead)); + } + + } + + public static SqlRead getSqlRead(ReadType readType) { + return READ_MAP.get(readType); + } + + public static SqlReadPollTask getSqlReadPollTask(ReadType readType) { + return POLL_TASK_MAP.get(readType); + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java new file mode 100644 index 000000000..a8c776b3e --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ChainRead.java @@ -0,0 +1,68 @@ +package com.yomahub.liteflow.parser.sql.read.impl; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.XmlUtil; +import com.yomahub.liteflow.parser.constant.ReadType; +import com.yomahub.liteflow.parser.constant.SqlReadConstant; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:44 + */ +public class ChainRead extends AbstractSqlRead { + + public ChainRead(SQLParserVO config) { + super(config); + } + + @Override + public String buildQuerySql() { + String chainTableName = super.config.getChainTableName(); + String elDataField = super.config.getElDataField(); + String chainNameField = super.config.getChainNameField(); + String chainApplicationNameField = super.config.getChainApplicationNameField(); + String applicationName = super.config.getApplicationName(); + + if (StrUtil.isBlank(chainTableName)) { + throw new ELSQLException("You did not define the chainTableName property"); + } + + if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) { + throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property"); + } + + String sqlCmd = StrUtil.format(SqlReadConstant.SQL_PATTERN, chainNameField, elDataField, chainTableName, + chainApplicationNameField); + + return sqlCmd; + } + + @Override + public String buildXmlElement(ResultSet rs) throws SQLException { + String elDataField = super.config.getElDataField(); + + return getStringFromResultSet(rs, elDataField); + } + + @Override + public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException { + String chainNameField = super.config.getChainNameField(); + + return getStringFromResultSet(rs, chainNameField); + } + + @Override + public ReadType type() { + return ReadType.CHAIN; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java new file mode 100644 index 000000000..8c04d5086 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/read/impl/ScriptRead.java @@ -0,0 +1,151 @@ +package com.yomahub.liteflow.parser.sql.read.impl; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.core.util.XmlUtil; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.enums.ScriptTypeEnum; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.constant.ReadType; +import com.yomahub.liteflow.parser.constant.SqlReadConstant; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead; +import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Objects; + +/** + * Copyright (C), 2021, 北京同创永益科技发展有限公司 + * + * @author tangkc + * @version 3.0.0 + * @description + * @date 2023/9/28 11:49 + */ +public class ScriptRead extends AbstractSqlRead { + + public ScriptRead(SQLParserVO config) { + super(config); + } + + @Override + public String buildQuerySql() { + String scriptLanguageField = super.config.getScriptLanguageField(); + String scriptTableName = super.config.getScriptTableName(); + String scriptIdField = super.config.getScriptIdField(); + String scriptDataField = super.config.getScriptDataField(); + String scriptNameField = super.config.getScriptNameField(); + String scriptTypeField = super.config.getScriptTypeField(); + String scriptApplicationNameField = super.config.getScriptApplicationNameField(); + String applicationName = super.config.getApplicationName(); + + if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) { + throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property"); + } + + String sqlCmd = null; + // 脚本节点(带语言) + if (withLanguage()) { + sqlCmd = StrUtil.format( + SqlReadConstant.SCRIPT_WITH_LANGUAGE_SQL_PATTERN, + scriptIdField, + scriptDataField, + scriptNameField, + scriptTypeField, + scriptLanguageField, + scriptTableName, + scriptApplicationNameField + ); + + return sqlCmd; + } + // 脚本节点(不带语言) + else { + sqlCmd = StrUtil.format( + SqlReadConstant.SCRIPT_SQL_PATTERN, + scriptIdField, + scriptDataField, + scriptNameField, + scriptTypeField, + scriptTableName, + scriptApplicationNameField + ); + } + + + return sqlCmd; + } + + @Override + public String buildXmlElement(ResultSet rs) throws SQLException { + String scriptDataField = super.config.getScriptDataField(); + + return getStringFromResultSet(rs, scriptDataField); + + } + + @Override + public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException { + String scriptIdField = super.config.getScriptIdField(); + String scriptNameField = super.config.getScriptNameField(); + String scriptTypeField = super.config.getScriptTypeField(); + String scriptLanguageField = super.config.getScriptLanguageField(); + + String id = getStringFromResultSet(rs, scriptIdField); + String name = getStringFromResultSet(rs, scriptNameField); + String type = getStringFromResultSet(rs, scriptTypeField); + String language = withLanguage() ? getStringFromResultSet(rs, scriptLanguageField) : null; + + NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type); + if (Objects.isNull(nodeTypeEnum)) { + throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type)); + } + + if (!nodeTypeEnum.isScript()) { + throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type)); + } + + if (withLanguage() && !ScriptTypeEnum.checkScriptType(language)) { + throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language)); + } + List keys = CollUtil.newArrayList(id, type, name); + if (StrUtil.isNotBlank(language)) { + keys.add(language); + } + + return StrUtil.join(StrUtil.COLON, keys); + } + + @Override + public boolean needRead() { + if (StrUtil.isBlank(super.config.getScriptTableName())) { + return false; + } + + String sqlCmd = StrUtil.format( + SqlReadConstant.SCRIPT_SQL_CHECK_PATTERN, + super.config.getScriptTableName() + ); + + Connection conn = LiteFlowJdbcUtil.getConn(super.config); + return LiteFlowJdbcUtil.checkConnectionCanExecuteSql(conn, sqlCmd); + } + + @Override + public ReadType type() { + return ReadType.SCRIPT; + } + + /** + * 脚本是否带语言 + */ + private boolean withLanguage() { + return StrUtil.isNotBlank(super.config.getScriptLanguageField()); + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java deleted file mode 100644 index ac6b9be84..000000000 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java +++ /dev/null @@ -1,126 +0,0 @@ -package com.yomahub.liteflow.parser.sql.util; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.crypto.digest.DigestUtil; -import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; -import com.yomahub.liteflow.flow.FlowBus; -import com.yomahub.liteflow.log.LFLog; -import com.yomahub.liteflow.log.LFLoggerManager; -import com.yomahub.liteflow.parser.sql.exception.ELSQLException; -import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * 用于轮询chain的定时任务 - * - * @author hxinyu - * @since 2.11.1 - */ -public class ChainPollingTask implements Runnable { - - private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; - - private Connection conn; - - private SQLParserVO sqlParserVO; - - private Map chainSHAMap; - - private static final Integer FETCH_SIZE_MAX = 1000; - - LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class); - - public ChainPollingTask(SQLParserVO sqlParserVO, Map chainSHAMap) { - this.sqlParserVO = sqlParserVO; - this.chainSHAMap = chainSHAMap; - } - - @Override - public void run() { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - PreparedStatement stmt = null; - ResultSet rs = null; - try{ - String chainTableName = sqlParserVO.getChainTableName(); - String elDataField = sqlParserVO.getElDataField(); - String chainNameField = sqlParserVO.getChainNameField(); - String chainApplicationNameField = sqlParserVO.getChainApplicationNameField(); - String applicationName = sqlParserVO.getApplicationName(); - - String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName, - chainApplicationNameField); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - // 设置游标拉取数量 - stmt.setFetchSize(FETCH_SIZE_MAX); - stmt.setString(1, applicationName); - rs = stmt.executeQuery(); - - Set newChainSet = new HashSet<>(); - - while(rs.next()) { - String chainName = getStringFromResultSet(rs, chainNameField); - String newData = getStringFromResultSet(rs, elDataField); - String newSHA = DigestUtil.sha1Hex(newData); - newChainSet.add(chainName); - //如果封装的SHAMap中不存在该chain, 表示该chain为新增 - if(!chainSHAMap.containsKey(chainName)) { - //新增chain - LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build(); - LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newData); - //加入到shaMap - chainSHAMap.put(chainName, newSHA); - - } - else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) { - //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain - //修改chain - LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build(); - LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newData); - //修改shaMap - chainSHAMap.put(chainName, newSHA); - } - //SHA值无变化,表示该chain未改变 - } - - if(chainSHAMap.size() > newChainSet.size()) { - //如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况: - // 1、删除了chain - // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain - // 3、上述两者结合 - //在此处遍历chainSHAMap,把不在newChainSet中的chain删除 - //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException - Iterator iterator = chainSHAMap.keySet().iterator(); - while(iterator.hasNext()) { - String chainName = iterator.next(); - if(!newChainSet.contains(chainName)) { - FlowBus.removeChain(chainName); - LOG.info("starting reload flow config... delete chain={}", chainName); - //修改SHAMap - iterator.remove(); - } - } - } - } catch (Exception e) { - LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e); - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - } - - private String getStringFromResultSet(ResultSet rs, String field) throws SQLException { - String data = rs.getString(field); - if (StrUtil.isBlank(data)) { - throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); - } - return data; - } -} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java index 8eb6dc4b8..aae5e44f7 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java @@ -5,22 +5,21 @@ import cn.hutool.core.thread.NamedThreadFactory; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.XmlUtil; -import cn.hutool.crypto.digest.DigestUtil; -import com.yomahub.liteflow.enums.NodeTypeEnum; -import com.yomahub.liteflow.enums.ScriptTypeEnum; -import com.yomahub.liteflow.parser.sql.exception.ELSQLException; -import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; +import com.yomahub.liteflow.parser.constant.ReadType; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; +import com.yomahub.liteflow.parser.helper.NodeConvertHelper; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.read.SqlRead; +import com.yomahub.liteflow.parser.sql.read.SqlReadFactory; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; +import org.apache.commons.lang.StringUtils; import java.util.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.yomahub.liteflow.parser.constant.SqlReadConstant.*; /** * jdbc 工具类 * @@ -29,26 +28,6 @@ import java.util.concurrent.TimeUnit; */ public class JDBCHelper { - private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; - - private static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} WHERE {}=?"; - - private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?"; - - private static final String SCRIPT_WITH_LANGUAG_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?"; - - private static final String CHAIN_XML_PATTERN = ""; - - private static final String NODE_XML_PATTERN = "{}"; - - private static final String NODE_ITEM_XML_PATTERN = ""; - - private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = ""; - - private static final String XML_PATTERN = "{}{}"; - - private static final Integer FETCH_SIZE_MAX = 1000; - private SQLParserVO sqlParserVO; private static JDBCHelper INSTANCE; @@ -59,12 +38,6 @@ public class JDBCHelper { //定时任务线程池 private static ScheduledThreadPoolExecutor pollExecutor; - //chain的SHA1加密值 用于轮询时确定chain是否变化 - private Map chainSHAMap = new HashMap<>(); - - //script的SHA1加密值 用于轮询时确定script是否变化 - private Map scriptSHAMap = new HashMap<>(); - /** * 初始化 INSTANCE */ @@ -75,13 +48,10 @@ public class JDBCHelper { Class.forName(sqlParserVO.getDriverClassName()); } INSTANCE.setSqlParserVO(sqlParserVO); - //创建定时任务线程池 + // 创建定时任务线程池 if (sqlParserVO.getPollingEnabled() && ObjectUtil.isNull(getPollExecutor())) { ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false); - ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor( - CORE_POOL_SIZE, - namedThreadFactory, - new ThreadPoolExecutor.DiscardOldestPolicy()); + ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE, namedThreadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); setPollExecutor(threadPoolExecutor); } } catch (ClassNotFoundException e) { @@ -100,64 +70,38 @@ public class JDBCHelper { * 获取 ElData 数据内容 */ public String getContent() { - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; + SqlRead chainRead = SqlReadFactory.getSqlRead(ReadType.CHAIN); + SqlRead scriptRead = SqlReadFactory.getSqlRead(ReadType.SCRIPT); - String chainTableName = sqlParserVO.getChainTableName(); - String elDataField = sqlParserVO.getElDataField(); - String chainNameField = sqlParserVO.getChainNameField(); - String chainApplicationNameField = sqlParserVO.getChainApplicationNameField(); - String applicationName = sqlParserVO.getApplicationName(); + // 获取 chain 数据 + Map chainMap = chainRead.read(); + List chainList = new ArrayList<>(); + chainMap.forEach((chainName, elData) -> { + chainList.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData)); + }); + String chainsContent = CollUtil.join(chainList, StrUtil.EMPTY); - if (StrUtil.isBlank(chainTableName)) { - throw new ELSQLException("You did not define the chainTableName property"); - } + // 获取脚本数据 + Map scriptMap = scriptRead.read(); + List scriptList = new ArrayList<>(); + scriptMap.forEach((scriptKey, elData) -> { + NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey); + String id = scriptVO.getNodeId(); + String name = scriptVO.getName(); + String type = scriptVO.getType(); + String language = scriptVO.getLanguage(); - if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) { - throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property"); - } - - String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName, - chainApplicationNameField); - - List result = new ArrayList<>(); - try { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - // 设置游标拉取数量 - stmt.setFetchSize(FETCH_SIZE_MAX); - stmt.setString(1, applicationName); - rs = stmt.executeQuery(); - - while (rs.next()) { - String elData = getStringFromResultSet(rs, elDataField); - String chainName = getStringFromResultSet(rs, chainNameField); - - result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData)); - - //如果需要轮询 计算该chainData的SHA值 - if(sqlParserVO.getPollingEnabled()){ - String chainSHA = DigestUtil.sha1Hex(elData); - chainSHAMap.put(chainName, chainSHA); - } + if (StringUtils.isNotBlank(scriptVO.getLanguage())) { + scriptList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),type, language, elData)); + } else { + scriptList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, elData)); } - } catch (Exception e) { - throw new ELSQLException(e.getMessage()); - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - - String chainsContent = CollUtil.join(result, StrUtil.EMPTY); - - String nodesContent; - if (hasScriptData()) { - nodesContent = getScriptNodes(); - } else { - nodesContent = StrUtil.EMPTY; - } + }); + String nodesContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptList, StrUtil.EMPTY)); + // 注册 + SqlReadFactory.registerSqlReadPollTask(ReadType.CHAIN, chainMap); + SqlReadFactory.registerSqlReadPollTask(ReadType.SCRIPT, scriptMap); return StrUtil.format(XML_PATTERN, nodesContent, chainsContent); } @@ -165,197 +109,22 @@ public class JDBCHelper { * 定时轮询拉取SQL中变化的数据 */ public void listenSQL() { - //添加轮询chain的定时任务 - ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap); - pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartSeconds().longValue(), - sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS); - if (hasScriptData()) { - //添加轮询script的定时任务 - ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap); - pollExecutor.scheduleAtFixedRate(scriptTask, sqlParserVO.getPollingStartSeconds().longValue(), - sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS); - } - } + // 添加轮询chain的定时任务 + pollExecutor.scheduleAtFixedRate( + () -> SqlReadFactory.getSqlReadPollTask(ReadType.CHAIN).execute(), + sqlParserVO.getPollingStartSeconds().longValue(), + sqlParserVO.getPollingIntervalSeconds().longValue(), + TimeUnit.SECONDS + ); + // 添加轮询script的定时任务 + pollExecutor.scheduleAtFixedRate( + () -> SqlReadFactory.getSqlReadPollTask(ReadType.SCRIPT).execute(), + sqlParserVO.getPollingStartSeconds().longValue(), + sqlParserVO.getPollingIntervalSeconds().longValue(), + TimeUnit.SECONDS + ); - /** - * 获取脚本节点内容 - */ - public String getScriptNodes() { - String scriptLanguageField = sqlParserVO.getScriptLanguageField(); - if (StrUtil.isNotBlank(scriptLanguageField)) { - return getScriptNodesWithLanguage(); - } - - List result = new ArrayList<>(); - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - - String scriptTableName = sqlParserVO.getScriptTableName(); - String scriptIdField = sqlParserVO.getScriptIdField(); - String scriptDataField = sqlParserVO.getScriptDataField(); - String scriptNameField = sqlParserVO.getScriptNameField(); - String scriptTypeField = sqlParserVO.getScriptTypeField(); - String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField(); - String applicationName = sqlParserVO.getApplicationName(); - - if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) { - throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property"); - } - - String sqlCmd = StrUtil.format(SCRIPT_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField, - scriptTypeField, scriptTableName, scriptApplicationNameField); - try { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - // 设置游标拉取数量 - stmt.setFetchSize(FETCH_SIZE_MAX); - stmt.setString(1, applicationName); - rs = stmt.executeQuery(); - - while (rs.next()) { - String id = getStringFromResultSet(rs, scriptIdField); - String data = getStringFromResultSet(rs, scriptDataField); - String name = getStringFromResultSet(rs, scriptNameField); - String type = getStringFromResultSet(rs, scriptTypeField); - - NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type); - if (Objects.isNull(nodeTypeEnum)) { - throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type)); - } - - if (!nodeTypeEnum.isScript()) { - throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type)); - } - - result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data)); - - //如果需要轮询 计算该scriptData的SHA值 - if(sqlParserVO.getPollingEnabled()){ - String scriptKey = StrUtil.join(":", id, type, name); - String scriptSHA = DigestUtil.sha1Hex(data); - scriptSHAMap.put(scriptKey, scriptSHA); - } - } - } catch (Exception e) { - throw new ELSQLException(e.getMessage()); - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY)); - } - - /** - * 获取脚本节点(带语言) - * - * @return - */ - public String getScriptNodesWithLanguage() { - - List result = new ArrayList<>(); - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - - String scriptTableName = sqlParserVO.getScriptTableName(); - String scriptIdField = sqlParserVO.getScriptIdField(); - String scriptDataField = sqlParserVO.getScriptDataField(); - String scriptNameField = sqlParserVO.getScriptNameField(); - String scriptTypeField = sqlParserVO.getScriptTypeField(); - String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField(); - String applicationName = sqlParserVO.getApplicationName(); - String scriptLanguageField = sqlParserVO.getScriptLanguageField(); - - if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) { - throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property"); - } - - String sqlCmd = StrUtil.format(SCRIPT_WITH_LANGUAG_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField, - scriptTypeField, scriptLanguageField, scriptTableName, scriptApplicationNameField); - try { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - // 设置游标拉取数量 - stmt.setFetchSize(FETCH_SIZE_MAX); - stmt.setString(1, applicationName); - rs = stmt.executeQuery(); - - while (rs.next()) { - String id = getStringFromResultSet(rs, scriptIdField); - String data = getStringFromResultSet(rs, scriptDataField); - String name = getStringFromResultSet(rs, scriptNameField); - String type = getStringFromResultSet(rs, scriptTypeField); - String language = getStringFromResultSet(rs, scriptLanguageField); - - NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type); - if (Objects.isNull(nodeTypeEnum)) { - throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type)); - } - - if (!nodeTypeEnum.isScript()) { - throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type)); - } - - if (!ScriptTypeEnum.checkScriptType(language)) { - throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language)); - } - - result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), - type, language, data)); - - //如果需要轮询 计算该scriptData的SHA值 - if(sqlParserVO.getPollingEnabled()){ - String scriptKey = StrUtil.join(":", id, type, name, language); - String scriptSHA = DigestUtil.sha1Hex(data); - scriptSHAMap.put(scriptKey, scriptSHA); - } - } - } catch (Exception e) { - throw new ELSQLException(e.getMessage()); - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY)); - } - - private boolean hasScriptData() { - if (StrUtil.isBlank(sqlParserVO.getScriptTableName())) { - return false; - } - - Connection conn = null; - PreparedStatement stmt = null; - ResultSet rs = null; - String sqlCmd = StrUtil.format(SCRIPT_SQL_CHECK_PATTERN, sqlParserVO.getScriptTableName(), - sqlParserVO.getScriptApplicationNameField()); - try { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - stmt.setFetchSize(1); - stmt.setString(1, sqlParserVO.getApplicationName()); - rs = stmt.executeQuery(); - return rs.next(); - } catch (Exception e) { - return false; - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - } - - private String getStringFromResultSet(ResultSet rs, String field) throws SQLException { - String data = rs.getString(field); - if (StrUtil.isBlank(data)) { - throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); - } - return data; - } - - private SQLParserVO getSqlParserVO() { - return sqlParserVO; } private void setSqlParserVO(SQLParserVO sqlParserVO) { diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java deleted file mode 100644 index 5a68e6f7f..000000000 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.yomahub.liteflow.parser.sql.util; - -import cn.hutool.core.util.StrUtil; -import cn.hutool.crypto.digest.DigestUtil; -import com.yomahub.liteflow.flow.FlowBus; -import com.yomahub.liteflow.log.LFLog; -import com.yomahub.liteflow.log.LFLoggerManager; -import com.yomahub.liteflow.parser.helper.NodeConvertHelper; -import com.yomahub.liteflow.parser.sql.exception.ELSQLException; -import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.*; - -/** - * 用于轮询script的定时任务 - * - * @author hxinyu - * @since 2.11.1 - */ -public class ScriptPollingTask implements Runnable { - - private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; - - private static final String CONCAT_PATTERN = "CONCAT_WS(':',{},{},{}) as script_concat"; - - private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat"; - - private static final String SCRIPT_KEY_FIELD = "script_concat"; - - private Connection conn; - - private SQLParserVO sqlParserVO; - - private Map scriptSHAMap; - - private static final Integer FETCH_SIZE_MAX = 1000; - - LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class); - - public ScriptPollingTask(SQLParserVO sqlParserVO, Map scriptSHAMap) { - this.sqlParserVO = sqlParserVO; - this.scriptSHAMap = scriptSHAMap; - } - - - @Override - public void run() { - conn = LiteFlowJdbcUtil.getConn(sqlParserVO); - PreparedStatement stmt = null; - ResultSet rs = null; - try { - String scriptTableName = sqlParserVO.getScriptTableName(); - String scriptIdField = sqlParserVO.getScriptIdField(); - String scriptDataField = sqlParserVO.getScriptDataField(); - String scriptNameField = sqlParserVO.getScriptNameField(); - String scriptTypeField = sqlParserVO.getScriptTypeField(); - String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField(); - String applicationName = sqlParserVO.getApplicationName(); - String scriptLanguageField = sqlParserVO.getScriptLanguageField(); - - String KeyField; - if (StrUtil.isNotBlank(scriptLanguageField)) { - KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptTypeField, scriptNameField, scriptLanguageField); - } else { - KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptTypeField, scriptNameField); - } - - String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, scriptDataField, scriptTableName, scriptApplicationNameField); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - // 设置游标拉取数量 - stmt.setFetchSize(FETCH_SIZE_MAX); - stmt.setString(1, applicationName); - rs = stmt.executeQuery(); - - Set newScriptSet = new HashSet<>(); - - while (rs.next()) { - String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD); - String newData = getStringFromResultSet(rs, scriptDataField); - String newSHA = DigestUtil.sha1Hex(newData); - newScriptSet.add(scriptKey); - //如果封装的SHAMap中不存在该script 表示该script为新增 - if (!scriptSHAMap.containsKey(scriptKey)) { - NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey); - //新增script - NodeConvertHelper.changeScriptNode(scriptVO, newData); - LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newData); - - //加入到shaMap - scriptSHAMap.put(scriptKey, newSHA); - } - else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) { - //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script - NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey); - //修改script - NodeConvertHelper.changeScriptNode(scriptVO, newData); - LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newData); - - //修改shaMap - scriptSHAMap.put(scriptKey, newSHA); - } - //SHA值无变化,表示该chain未改变 - } - - if(scriptSHAMap.size() > newScriptSet.size()) { - //如果遍历prepareStatement后修改过的SHAMap数量比最新script总数多, 说明有两种情况: - // 1、删除了script - // 2、修改了script的id/name/type:因为遍历到新的script_key时会加到SHAMap里,但没有机会删除旧的script - // 3、上述两者结合 - //在此处遍历scriptSHAMap,把不在newScriptSet中的script删除 - //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException - Iterator iterator = scriptSHAMap.keySet().iterator(); - while(iterator.hasNext()){ - String scriptKey = iterator.next(); - if (!newScriptSet.contains(scriptKey)) { - NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey); - //删除script - FlowBus.getNodeMap().remove(scriptVO.getNodeId()); - LOG.info("starting reload flow config... delete script={}", scriptKey); - //修改SHAMap - iterator.remove(); - } - } - } - - } catch (Exception e) { - LOG.error("[Exception during SQL script polling] " + e.getMessage(), e); - } finally { - // 关闭连接 - LiteFlowJdbcUtil.close(conn, stmt, rs); - } - } - - private String getStringFromResultSet(ResultSet rs, String field) throws SQLException { - String data = rs.getString(field); - if (StrUtil.isBlank(data)) { - throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); - } - return data; - } -} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java index b15d68b72..8824328dd 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java @@ -90,15 +90,26 @@ public class SQLParserVO { */ private String scriptLanguageField; - /*轮询机制是否开启 默认不开启*/ + /** + * 轮询机制是否开启 默认不开启 + */ private Boolean pollingEnabled = false; - /*轮询时间间隔(s) 默认120s*/ + /** + * 轮询时间间隔(s) 默认60s + */ private Integer pollingIntervalSeconds = 60; - /*规则配置后首次轮询的起始时间 默认为60s*/ + /** + * 规则配置后首次轮询的起始时间 默认为60s + */ private Integer pollingStartSeconds = 60; + /** + * 是否开启sql日志 + */ + private Boolean sqlLogEnabled = true; + public String getUrl() { return url; } @@ -257,4 +268,12 @@ public class SQLParserVO { public void setPollingStartSeconds(Integer pollingStartSeconds) { this.pollingStartSeconds = pollingStartSeconds; } + + public Boolean getSqlLogEnabled() { + return sqlLogEnabled; + } + + public void setSqlLogEnabled(Boolean sqlLogEnabled) { + this.sqlLogEnabled = sqlLogEnabled; + } }