mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-05-14 20:22:07 +08:00
sql 插件重构
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
package com.yomahub.liteflow.parser.constant;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:43
|
||||
*/
|
||||
public enum ReadType {
|
||||
CHAIN,
|
||||
SCRIPT;
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.yomahub.liteflow.parser.constant;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:42
|
||||
*/
|
||||
public class SqlReadConstant {
|
||||
|
||||
public static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
|
||||
|
||||
public static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} ";
|
||||
|
||||
public static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
|
||||
|
||||
public static final String SCRIPT_WITH_LANGUAGE_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
|
||||
|
||||
public static final String CHAIN_XML_PATTERN = "<chain name=\"{}\"><![CDATA[{}]]></chain>";
|
||||
|
||||
public static final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
|
||||
public static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
public static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
public static final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
|
||||
public static final Integer FETCH_SIZE_MAX = 1000;
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.core.FlowInitHook;
|
||||
import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlReadFactory;
|
||||
import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
@@ -53,6 +54,9 @@ public class SQLXmlELParser extends ClassXmlFlowELParser {
|
||||
|
||||
// 初始化 JDBCHelper
|
||||
JDBCHelper.init(sqlParserVO);
|
||||
|
||||
// 初始化 SqlReadFactory
|
||||
SqlReadFactory.registerRead(sqlParserVO);
|
||||
}
|
||||
catch (ELSQLException elsqlException) {
|
||||
throw elsqlException;
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
package com.yomahub.liteflow.parser.sql.polling;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlRead;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 14:12
|
||||
*/
|
||||
public abstract class AbstractSqlReadPollTask implements SqlReadPollTask {
|
||||
private final Map<String/*唯一键*/, String/*data-xml的sha1值*/> DATA_SHA_MAP;
|
||||
private final SqlRead read;
|
||||
|
||||
public AbstractSqlReadPollTask(Map<String, String> dataMap, SqlRead read) {
|
||||
this.read = read;
|
||||
this.DATA_SHA_MAP = shaMapValue(dataMap);
|
||||
|
||||
if (!read.type().equals(type())) {
|
||||
throw new ELSQLException("SqlReadPollTask type not match");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() {
|
||||
Map<String/*唯一键*/, String/*data-xml*/> newData = read.read();
|
||||
// 新增或者更新的元素
|
||||
Map<String, String> saveElementMap = new HashMap();
|
||||
// 删除的元素
|
||||
List<String> deleteElementIds = new ArrayList<>();
|
||||
|
||||
for (Map.Entry<String, String> entry : newData.entrySet()) {
|
||||
String id = entry.getKey();
|
||||
String element = entry.getValue();
|
||||
String newSHA = DigestUtil.sha1Hex(element);
|
||||
|
||||
// 新增
|
||||
// 如果封装的SHAMap中不存在该chain, 表示该元素为新增
|
||||
if (!DATA_SHA_MAP.containsKey(id)) {
|
||||
saveElementMap.put(id, element);
|
||||
|
||||
DATA_SHA_MAP.put(id, newSHA);
|
||||
}
|
||||
// 修改
|
||||
// SHA值发生变化,表示该元素的值已被修改,重新拉取变化的chain
|
||||
else if (!StrUtil.equals(newSHA, DATA_SHA_MAP.get(id))) {
|
||||
saveElementMap.put(id, element);
|
||||
|
||||
DATA_SHA_MAP.put(id, newSHA);
|
||||
}
|
||||
}
|
||||
|
||||
Set<String> oldIdList = DATA_SHA_MAP.keySet(); // 旧的 id 列表
|
||||
Set<String> newIdList = newData.keySet(); // 新的 id 列表
|
||||
// 计算单差集
|
||||
// 计算集合的单差集,即只返回【newIdList】中有,但是【oldIdList】中没有的元素,例如:
|
||||
// subtractToList([1,2,3,4],[2,3,4,5]) -》 [1]
|
||||
deleteElementIds = CollUtil.subtractToList(newIdList, oldIdList);
|
||||
|
||||
for (String id : deleteElementIds) {
|
||||
DATA_SHA_MAP.remove(id);
|
||||
}
|
||||
|
||||
if (CollUtil.isNotEmpty(saveElementMap)) {
|
||||
doSave(saveElementMap);
|
||||
}
|
||||
|
||||
if (CollUtil.isNotEmpty(deleteElementIds)) {
|
||||
doDelete(deleteElementIds);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void doSave(Map<String, String> saveElementMap);
|
||||
|
||||
public abstract void doDelete(List<String> deleteElementId);
|
||||
|
||||
private Map<String/*唯一键*/, String/*data-xml的sha1值*/> shaMapValue(Map<String, String> dataMap) {
|
||||
Map<String, String> result = new HashMap<>();
|
||||
dataMap.forEach((k, v) -> {
|
||||
result.put(k, DigestUtil.sha1Hex(v));
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.yomahub.liteflow.parser.sql.polling;
|
||||
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 14:10
|
||||
*/
|
||||
public interface SqlReadPollTask {
|
||||
|
||||
void execute();
|
||||
|
||||
ReadType type();
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package com.yomahub.liteflow.parser.sql.polling.impl;
|
||||
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
import com.yomahub.liteflow.parser.helper.ParserHelper;
|
||||
import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlRead;
|
||||
import org.dom4j.Document;
|
||||
import org.dom4j.DocumentHelper;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.yomahub.liteflow.common.ChainConstant.ID;
|
||||
import static com.yomahub.liteflow.common.ChainConstant.NAME;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 14:46
|
||||
*/
|
||||
public class ChainReadPollTask extends AbstractSqlReadPollTask {
|
||||
|
||||
public ChainReadPollTask(Map<String, String> dataMap, SqlRead read) {
|
||||
super(dataMap, read);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSave(Map<String, String> saveElementMap) {
|
||||
for (Map.Entry<String, String> entry : saveElementMap.entrySet()) {
|
||||
String chainName = entry.getKey();
|
||||
String newData = entry.getValue();
|
||||
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doDelete(List<String> deleteElementId) {
|
||||
for (String id : deleteElementId) {
|
||||
FlowBus.removeChain(id);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadType type() {
|
||||
return ReadType.CHAIN;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package com.yomahub.liteflow.parser.sql.polling.impl;
|
||||
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.parser.sql.polling.AbstractSqlReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlRead;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 15:03
|
||||
*/
|
||||
public class ScriptReadPollTask extends AbstractSqlReadPollTask {
|
||||
public ScriptReadPollTask(Map<String, String> dataMap, SqlRead read) {
|
||||
super(dataMap, read);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doSave(Map<String, String> saveElementMap) {
|
||||
for (Map.Entry<String, String> entry : saveElementMap.entrySet()) {
|
||||
String scriptKey = entry.getKey();
|
||||
String newData = entry.getValue();
|
||||
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
|
||||
NodeConvertHelper.changeScriptNode(scriptVO, newData);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doDelete(List<String> deleteElementId) {
|
||||
for (String id : deleteElementId) {
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(id);
|
||||
|
||||
// 删除script
|
||||
FlowBus.getNodeMap().remove(scriptVO.getNodeId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadType type() {
|
||||
return ReadType.SCRIPT;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
package com.yomahub.liteflow.parser.sql.read;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.constant.SqlReadConstant;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead;
|
||||
import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:26
|
||||
*/
|
||||
public abstract class AbstractSqlRead implements SqlRead {
|
||||
public final SQLParserVO config;
|
||||
private static LFLog LOG = LFLoggerManager.getLogger(AbstractSqlRead.class);
|
||||
|
||||
public AbstractSqlRead(SQLParserVO config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String/*规则唯一键*/, String/*规则内容*/> read() {
|
||||
// 如果不需要读取直接返回
|
||||
if (!needRead()) {
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
Map<String/*规则唯一键*/, String/*规则*/> result = new HashMap<>();
|
||||
String sqlCmd = buildQuerySql();
|
||||
if (config.getSqlLogEnabled()) {
|
||||
LOG.info("query sql:{}", sqlCmd.replace("?", "'" + config.getApplicationName() + "'"));
|
||||
}
|
||||
|
||||
Connection conn = null;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
conn = LiteFlowJdbcUtil.getConn(config);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(SqlReadConstant.FETCH_SIZE_MAX);
|
||||
stmt.setString(1, config.getApplicationName());
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
String xml = buildXmlElement(rs);
|
||||
String uniqueKey = buildXmlElementUniqueKey(rs);
|
||||
|
||||
result.put(uniqueKey, xml);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public abstract String buildQuerySql();
|
||||
|
||||
public abstract String buildXmlElement(ResultSet rs) throws SQLException;
|
||||
|
||||
public abstract String buildXmlElementUniqueKey(ResultSet rs) throws SQLException;
|
||||
|
||||
/**
|
||||
* 是否可以读取
|
||||
* chain 默认可以读取
|
||||
* script 需要判断是否有配置
|
||||
*/
|
||||
public boolean needRead() {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
public String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
|
||||
String data = rs.getString(field);
|
||||
if (StrUtil.isBlank(data)) {
|
||||
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.yomahub.liteflow.parser.sql.read;
|
||||
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:25
|
||||
*/
|
||||
public interface SqlRead {
|
||||
|
||||
Map<String/*规则唯一键*/, String/*规则内容*/> read();
|
||||
|
||||
ReadType type();
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.yomahub.liteflow.parser.sql.read;
|
||||
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
import com.yomahub.liteflow.parser.sql.polling.SqlReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.polling.impl.ChainReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.polling.impl.ScriptReadPollTask;
|
||||
import com.yomahub.liteflow.parser.sql.read.impl.ChainRead;
|
||||
import com.yomahub.liteflow.parser.sql.read.impl.ScriptRead;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 15:42
|
||||
*/
|
||||
public class SqlReadFactory {
|
||||
private static final Map<ReadType, SqlRead> READ_MAP = new HashMap<>();
|
||||
private static final Map<ReadType, SqlReadPollTask> POLL_TASK_MAP = new HashMap<>();
|
||||
|
||||
public static void registerRead(SQLParserVO config) {
|
||||
READ_MAP.put(ReadType.CHAIN, new ChainRead(config));
|
||||
READ_MAP.put(ReadType.SCRIPT, new ScriptRead(config));
|
||||
}
|
||||
|
||||
public static void registerSqlReadPollTask(ReadType readType, Map<String, String> dataMap) {
|
||||
SqlRead sqlRead = getSqlRead(readType);
|
||||
if (ReadType.CHAIN.equals(readType)) {
|
||||
POLL_TASK_MAP.put(ReadType.CHAIN, new ChainReadPollTask(dataMap, sqlRead));
|
||||
} else if (ReadType.SCRIPT.equals(readType)) {
|
||||
POLL_TASK_MAP.put(ReadType.SCRIPT, new ScriptReadPollTask(dataMap, sqlRead));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static SqlRead getSqlRead(ReadType readType) {
|
||||
return READ_MAP.get(readType);
|
||||
}
|
||||
|
||||
public static SqlReadPollTask getSqlReadPollTask(ReadType readType) {
|
||||
return POLL_TASK_MAP.get(readType);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
package com.yomahub.liteflow.parser.sql.read.impl;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.XmlUtil;
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
import com.yomahub.liteflow.parser.constant.SqlReadConstant;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:44
|
||||
*/
|
||||
public class ChainRead extends AbstractSqlRead {
|
||||
|
||||
public ChainRead(SQLParserVO config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildQuerySql() {
|
||||
String chainTableName = super.config.getChainTableName();
|
||||
String elDataField = super.config.getElDataField();
|
||||
String chainNameField = super.config.getChainNameField();
|
||||
String chainApplicationNameField = super.config.getChainApplicationNameField();
|
||||
String applicationName = super.config.getApplicationName();
|
||||
|
||||
if (StrUtil.isBlank(chainTableName)) {
|
||||
throw new ELSQLException("You did not define the chainTableName property");
|
||||
}
|
||||
|
||||
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
|
||||
throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SqlReadConstant.SQL_PATTERN, chainNameField, elDataField, chainTableName,
|
||||
chainApplicationNameField);
|
||||
|
||||
return sqlCmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildXmlElement(ResultSet rs) throws SQLException {
|
||||
String elDataField = super.config.getElDataField();
|
||||
|
||||
return getStringFromResultSet(rs, elDataField);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException {
|
||||
String chainNameField = super.config.getChainNameField();
|
||||
|
||||
return getStringFromResultSet(rs, chainNameField);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadType type() {
|
||||
return ReadType.CHAIN;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,151 @@
|
||||
package com.yomahub.liteflow.parser.sql.read.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.XmlUtil;
|
||||
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
||||
import com.yomahub.liteflow.enums.ScriptTypeEnum;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
import com.yomahub.liteflow.parser.constant.SqlReadConstant;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.AbstractSqlRead;
|
||||
import com.yomahub.liteflow.parser.sql.util.LiteFlowJdbcUtil;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Copyright (C), 2021, 北京同创永益科技发展有限公司
|
||||
*
|
||||
* @author tangkc
|
||||
* @version 3.0.0
|
||||
* @description
|
||||
* @date 2023/9/28 11:49
|
||||
*/
|
||||
public class ScriptRead extends AbstractSqlRead {
|
||||
|
||||
public ScriptRead(SQLParserVO config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildQuerySql() {
|
||||
String scriptLanguageField = super.config.getScriptLanguageField();
|
||||
String scriptTableName = super.config.getScriptTableName();
|
||||
String scriptIdField = super.config.getScriptIdField();
|
||||
String scriptDataField = super.config.getScriptDataField();
|
||||
String scriptNameField = super.config.getScriptNameField();
|
||||
String scriptTypeField = super.config.getScriptTypeField();
|
||||
String scriptApplicationNameField = super.config.getScriptApplicationNameField();
|
||||
String applicationName = super.config.getApplicationName();
|
||||
|
||||
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
|
||||
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
|
||||
}
|
||||
|
||||
String sqlCmd = null;
|
||||
// 脚本节点(带语言)
|
||||
if (withLanguage()) {
|
||||
sqlCmd = StrUtil.format(
|
||||
SqlReadConstant.SCRIPT_WITH_LANGUAGE_SQL_PATTERN,
|
||||
scriptIdField,
|
||||
scriptDataField,
|
||||
scriptNameField,
|
||||
scriptTypeField,
|
||||
scriptLanguageField,
|
||||
scriptTableName,
|
||||
scriptApplicationNameField
|
||||
);
|
||||
|
||||
return sqlCmd;
|
||||
}
|
||||
// 脚本节点(不带语言)
|
||||
else {
|
||||
sqlCmd = StrUtil.format(
|
||||
SqlReadConstant.SCRIPT_SQL_PATTERN,
|
||||
scriptIdField,
|
||||
scriptDataField,
|
||||
scriptNameField,
|
||||
scriptTypeField,
|
||||
scriptTableName,
|
||||
scriptApplicationNameField
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
return sqlCmd;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildXmlElement(ResultSet rs) throws SQLException {
|
||||
String scriptDataField = super.config.getScriptDataField();
|
||||
|
||||
return getStringFromResultSet(rs, scriptDataField);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String buildXmlElementUniqueKey(ResultSet rs) throws SQLException {
|
||||
String scriptIdField = super.config.getScriptIdField();
|
||||
String scriptNameField = super.config.getScriptNameField();
|
||||
String scriptTypeField = super.config.getScriptTypeField();
|
||||
String scriptLanguageField = super.config.getScriptLanguageField();
|
||||
|
||||
String id = getStringFromResultSet(rs, scriptIdField);
|
||||
String name = getStringFromResultSet(rs, scriptNameField);
|
||||
String type = getStringFromResultSet(rs, scriptTypeField);
|
||||
String language = withLanguage() ? getStringFromResultSet(rs, scriptLanguageField) : null;
|
||||
|
||||
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
|
||||
if (Objects.isNull(nodeTypeEnum)) {
|
||||
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
|
||||
}
|
||||
|
||||
if (!nodeTypeEnum.isScript()) {
|
||||
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
|
||||
}
|
||||
|
||||
if (withLanguage() && !ScriptTypeEnum.checkScriptType(language)) {
|
||||
throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language));
|
||||
}
|
||||
List<String> keys = CollUtil.newArrayList(id, type, name);
|
||||
if (StrUtil.isNotBlank(language)) {
|
||||
keys.add(language);
|
||||
}
|
||||
|
||||
return StrUtil.join(StrUtil.COLON, keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needRead() {
|
||||
if (StrUtil.isBlank(super.config.getScriptTableName())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(
|
||||
SqlReadConstant.SCRIPT_SQL_CHECK_PATTERN,
|
||||
super.config.getScriptTableName()
|
||||
);
|
||||
|
||||
Connection conn = LiteFlowJdbcUtil.getConn(super.config);
|
||||
return LiteFlowJdbcUtil.checkConnectionCanExecuteSql(conn, sqlCmd);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReadType type() {
|
||||
return ReadType.SCRIPT;
|
||||
}
|
||||
|
||||
/**
|
||||
* 脚本是否带语言
|
||||
*/
|
||||
private boolean withLanguage() {
|
||||
return StrUtil.isNotBlank(super.config.getScriptLanguageField());
|
||||
}
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
package com.yomahub.liteflow.parser.sql.util;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* 用于轮询chain的定时任务
|
||||
*
|
||||
* @author hxinyu
|
||||
* @since 2.11.1
|
||||
*/
|
||||
public class ChainPollingTask implements Runnable {
|
||||
|
||||
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
|
||||
|
||||
private Connection conn;
|
||||
|
||||
private SQLParserVO sqlParserVO;
|
||||
|
||||
private Map<String, String> chainSHAMap;
|
||||
|
||||
private static final Integer FETCH_SIZE_MAX = 1000;
|
||||
|
||||
LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class);
|
||||
|
||||
public ChainPollingTask(SQLParserVO sqlParserVO, Map<String, String> chainSHAMap) {
|
||||
this.sqlParserVO = sqlParserVO;
|
||||
this.chainSHAMap = chainSHAMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
try{
|
||||
String chainTableName = sqlParserVO.getChainTableName();
|
||||
String elDataField = sqlParserVO.getElDataField();
|
||||
String chainNameField = sqlParserVO.getChainNameField();
|
||||
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
|
||||
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
|
||||
chainApplicationNameField);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(FETCH_SIZE_MAX);
|
||||
stmt.setString(1, applicationName);
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
Set<String> newChainSet = new HashSet<>();
|
||||
|
||||
while(rs.next()) {
|
||||
String chainName = getStringFromResultSet(rs, chainNameField);
|
||||
String newData = getStringFromResultSet(rs, elDataField);
|
||||
String newSHA = DigestUtil.sha1Hex(newData);
|
||||
newChainSet.add(chainName);
|
||||
//如果封装的SHAMap中不存在该chain, 表示该chain为新增
|
||||
if(!chainSHAMap.containsKey(chainName)) {
|
||||
//新增chain
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
|
||||
LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newData);
|
||||
//加入到shaMap
|
||||
chainSHAMap.put(chainName, newSHA);
|
||||
|
||||
}
|
||||
else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) {
|
||||
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
|
||||
//修改chain
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
|
||||
LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newData);
|
||||
//修改shaMap
|
||||
chainSHAMap.put(chainName, newSHA);
|
||||
}
|
||||
//SHA值无变化,表示该chain未改变
|
||||
}
|
||||
|
||||
if(chainSHAMap.size() > newChainSet.size()) {
|
||||
//如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况:
|
||||
// 1、删除了chain
|
||||
// 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain
|
||||
// 3、上述两者结合
|
||||
//在此处遍历chainSHAMap,把不在newChainSet中的chain删除
|
||||
//这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
|
||||
Iterator<String> iterator = chainSHAMap.keySet().iterator();
|
||||
while(iterator.hasNext()) {
|
||||
String chainName = iterator.next();
|
||||
if(!newChainSet.contains(chainName)) {
|
||||
FlowBus.removeChain(chainName);
|
||||
LOG.info("starting reload flow config... delete chain={}", chainName);
|
||||
//修改SHAMap
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
}
|
||||
|
||||
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
|
||||
String data = rs.getString(field);
|
||||
if (StrUtil.isBlank(data)) {
|
||||
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
}
|
||||
@@ -5,22 +5,21 @@ import cn.hutool.core.thread.NamedThreadFactory;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.core.util.XmlUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.enums.NodeTypeEnum;
|
||||
import com.yomahub.liteflow.enums.ScriptTypeEnum;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
import com.yomahub.liteflow.parser.constant.ReadType;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlRead;
|
||||
import com.yomahub.liteflow.parser.sql.read.SqlReadFactory;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.yomahub.liteflow.parser.constant.SqlReadConstant.*;
|
||||
/**
|
||||
* jdbc 工具类
|
||||
*
|
||||
@@ -29,26 +28,6 @@ import java.util.concurrent.TimeUnit;
|
||||
*/
|
||||
public class JDBCHelper {
|
||||
|
||||
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
|
||||
|
||||
private static final String SCRIPT_SQL_CHECK_PATTERN = "SELECT 1 FROM {} WHERE {}=?";
|
||||
|
||||
private static final String SCRIPT_SQL_PATTERN = "SELECT {},{},{},{} FROM {} WHERE {}=?";
|
||||
|
||||
private static final String SCRIPT_WITH_LANGUAG_SQL_PATTERN = "SELECT {},{},{},{},{} FROM {} WHERE {}=?";
|
||||
|
||||
private static final String CHAIN_XML_PATTERN = "<chain name=\"{}\"><![CDATA[{}]]></chain>";
|
||||
|
||||
private static final String NODE_XML_PATTERN = "<nodes>{}</nodes>";
|
||||
|
||||
private static final String NODE_ITEM_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
private static final String NODE_ITEM_WITH_LANGUAGE_XML_PATTERN = "<node id=\"{}\" name=\"{}\" type=\"{}\" language=\"{}\"><![CDATA[{}]]></node>";
|
||||
|
||||
private static final String XML_PATTERN = "<?xml version=\"1.0\" encoding=\"UTF-8\"?><flow>{}{}</flow>";
|
||||
|
||||
private static final Integer FETCH_SIZE_MAX = 1000;
|
||||
|
||||
private SQLParserVO sqlParserVO;
|
||||
|
||||
private static JDBCHelper INSTANCE;
|
||||
@@ -59,12 +38,6 @@ public class JDBCHelper {
|
||||
//定时任务线程池
|
||||
private static ScheduledThreadPoolExecutor pollExecutor;
|
||||
|
||||
//chain的SHA1加密值 用于轮询时确定chain是否变化
|
||||
private Map<String/*chainId*/, String/*chain的SHA1加密值*/> chainSHAMap = new HashMap<>();
|
||||
|
||||
//script的SHA1加密值 用于轮询时确定script是否变化
|
||||
private Map<String/*id:type:name*/, String/*script的SHA1加密值*/> scriptSHAMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 初始化 INSTANCE
|
||||
*/
|
||||
@@ -75,13 +48,10 @@ public class JDBCHelper {
|
||||
Class.forName(sqlParserVO.getDriverClassName());
|
||||
}
|
||||
INSTANCE.setSqlParserVO(sqlParserVO);
|
||||
//创建定时任务线程池
|
||||
// 创建定时任务线程池
|
||||
if (sqlParserVO.getPollingEnabled() && ObjectUtil.isNull(getPollExecutor())) {
|
||||
ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false);
|
||||
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(
|
||||
CORE_POOL_SIZE,
|
||||
namedThreadFactory,
|
||||
new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(CORE_POOL_SIZE, namedThreadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
setPollExecutor(threadPoolExecutor);
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
@@ -100,64 +70,38 @@ public class JDBCHelper {
|
||||
* 获取 ElData 数据内容
|
||||
*/
|
||||
public String getContent() {
|
||||
Connection conn = null;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
SqlRead chainRead = SqlReadFactory.getSqlRead(ReadType.CHAIN);
|
||||
SqlRead scriptRead = SqlReadFactory.getSqlRead(ReadType.SCRIPT);
|
||||
|
||||
String chainTableName = sqlParserVO.getChainTableName();
|
||||
String elDataField = sqlParserVO.getElDataField();
|
||||
String chainNameField = sqlParserVO.getChainNameField();
|
||||
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
// 获取 chain 数据
|
||||
Map<String, String> chainMap = chainRead.read();
|
||||
List<String> chainList = new ArrayList<>();
|
||||
chainMap.forEach((chainName, elData) -> {
|
||||
chainList.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
|
||||
});
|
||||
String chainsContent = CollUtil.join(chainList, StrUtil.EMPTY);
|
||||
|
||||
if (StrUtil.isBlank(chainTableName)) {
|
||||
throw new ELSQLException("You did not define the chainTableName property");
|
||||
}
|
||||
// 获取脚本数据
|
||||
Map<String, String> scriptMap = scriptRead.read();
|
||||
List<String> scriptList = new ArrayList<>();
|
||||
scriptMap.forEach((scriptKey, elData) -> {
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
|
||||
String id = scriptVO.getNodeId();
|
||||
String name = scriptVO.getName();
|
||||
String type = scriptVO.getType();
|
||||
String language = scriptVO.getLanguage();
|
||||
|
||||
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(chainApplicationNameField)) {
|
||||
throw new ELSQLException("You did not define the applicationName or chainApplicationNameField property");
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
|
||||
chainApplicationNameField);
|
||||
|
||||
List<String> result = new ArrayList<>();
|
||||
try {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(FETCH_SIZE_MAX);
|
||||
stmt.setString(1, applicationName);
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
String elData = getStringFromResultSet(rs, elDataField);
|
||||
String chainName = getStringFromResultSet(rs, chainNameField);
|
||||
|
||||
result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData));
|
||||
|
||||
//如果需要轮询 计算该chainData的SHA值
|
||||
if(sqlParserVO.getPollingEnabled()){
|
||||
String chainSHA = DigestUtil.sha1Hex(elData);
|
||||
chainSHAMap.put(chainName, chainSHA);
|
||||
}
|
||||
if (StringUtils.isNotBlank(scriptVO.getLanguage())) {
|
||||
scriptList.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),type, language, elData));
|
||||
} else {
|
||||
scriptList.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, elData));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
|
||||
String chainsContent = CollUtil.join(result, StrUtil.EMPTY);
|
||||
|
||||
String nodesContent;
|
||||
if (hasScriptData()) {
|
||||
nodesContent = getScriptNodes();
|
||||
} else {
|
||||
nodesContent = StrUtil.EMPTY;
|
||||
}
|
||||
});
|
||||
String nodesContent = StrUtil.format(NODE_XML_PATTERN, CollUtil.join(scriptList, StrUtil.EMPTY));
|
||||
|
||||
// 注册
|
||||
SqlReadFactory.registerSqlReadPollTask(ReadType.CHAIN, chainMap);
|
||||
SqlReadFactory.registerSqlReadPollTask(ReadType.SCRIPT, scriptMap);
|
||||
return StrUtil.format(XML_PATTERN, nodesContent, chainsContent);
|
||||
}
|
||||
|
||||
@@ -165,197 +109,22 @@ public class JDBCHelper {
|
||||
* 定时轮询拉取SQL中变化的数据
|
||||
*/
|
||||
public void listenSQL() {
|
||||
//添加轮询chain的定时任务
|
||||
ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap);
|
||||
pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartSeconds().longValue(),
|
||||
sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS);
|
||||
if (hasScriptData()) {
|
||||
//添加轮询script的定时任务
|
||||
ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap);
|
||||
pollExecutor.scheduleAtFixedRate(scriptTask, sqlParserVO.getPollingStartSeconds().longValue(),
|
||||
sqlParserVO.getPollingIntervalSeconds().longValue(), TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
// 添加轮询chain的定时任务
|
||||
pollExecutor.scheduleAtFixedRate(
|
||||
() -> SqlReadFactory.getSqlReadPollTask(ReadType.CHAIN).execute(),
|
||||
sqlParserVO.getPollingStartSeconds().longValue(),
|
||||
sqlParserVO.getPollingIntervalSeconds().longValue(),
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
// 添加轮询script的定时任务
|
||||
pollExecutor.scheduleAtFixedRate(
|
||||
() -> SqlReadFactory.getSqlReadPollTask(ReadType.SCRIPT).execute(),
|
||||
sqlParserVO.getPollingStartSeconds().longValue(),
|
||||
sqlParserVO.getPollingIntervalSeconds().longValue(),
|
||||
TimeUnit.SECONDS
|
||||
);
|
||||
|
||||
/**
|
||||
* 获取脚本节点内容
|
||||
*/
|
||||
public String getScriptNodes() {
|
||||
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
|
||||
if (StrUtil.isNotBlank(scriptLanguageField)) {
|
||||
return getScriptNodesWithLanguage();
|
||||
}
|
||||
|
||||
List<String> result = new ArrayList<>();
|
||||
Connection conn = null;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
String scriptTableName = sqlParserVO.getScriptTableName();
|
||||
String scriptIdField = sqlParserVO.getScriptIdField();
|
||||
String scriptDataField = sqlParserVO.getScriptDataField();
|
||||
String scriptNameField = sqlParserVO.getScriptNameField();
|
||||
String scriptTypeField = sqlParserVO.getScriptTypeField();
|
||||
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
|
||||
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
|
||||
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SCRIPT_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
|
||||
scriptTypeField, scriptTableName, scriptApplicationNameField);
|
||||
try {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(FETCH_SIZE_MAX);
|
||||
stmt.setString(1, applicationName);
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
String id = getStringFromResultSet(rs, scriptIdField);
|
||||
String data = getStringFromResultSet(rs, scriptDataField);
|
||||
String name = getStringFromResultSet(rs, scriptNameField);
|
||||
String type = getStringFromResultSet(rs, scriptTypeField);
|
||||
|
||||
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
|
||||
if (Objects.isNull(nodeTypeEnum)) {
|
||||
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
|
||||
}
|
||||
|
||||
if (!nodeTypeEnum.isScript()) {
|
||||
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
|
||||
}
|
||||
|
||||
result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data));
|
||||
|
||||
//如果需要轮询 计算该scriptData的SHA值
|
||||
if(sqlParserVO.getPollingEnabled()){
|
||||
String scriptKey = StrUtil.join(":", id, type, name);
|
||||
String scriptSHA = DigestUtil.sha1Hex(data);
|
||||
scriptSHAMap.put(scriptKey, scriptSHA);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取脚本节点(带语言)
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public String getScriptNodesWithLanguage() {
|
||||
|
||||
List<String> result = new ArrayList<>();
|
||||
Connection conn = null;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
String scriptTableName = sqlParserVO.getScriptTableName();
|
||||
String scriptIdField = sqlParserVO.getScriptIdField();
|
||||
String scriptDataField = sqlParserVO.getScriptDataField();
|
||||
String scriptNameField = sqlParserVO.getScriptNameField();
|
||||
String scriptTypeField = sqlParserVO.getScriptTypeField();
|
||||
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
|
||||
|
||||
if (StrUtil.isBlank(applicationName) || StrUtil.isBlank(scriptApplicationNameField)) {
|
||||
throw new ELSQLException("You did not define the applicationName or scriptApplicationNameField property");
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SCRIPT_WITH_LANGUAG_SQL_PATTERN, scriptIdField, scriptDataField, scriptNameField,
|
||||
scriptTypeField, scriptLanguageField, scriptTableName, scriptApplicationNameField);
|
||||
try {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(FETCH_SIZE_MAX);
|
||||
stmt.setString(1, applicationName);
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
while (rs.next()) {
|
||||
String id = getStringFromResultSet(rs, scriptIdField);
|
||||
String data = getStringFromResultSet(rs, scriptDataField);
|
||||
String name = getStringFromResultSet(rs, scriptNameField);
|
||||
String type = getStringFromResultSet(rs, scriptTypeField);
|
||||
String language = getStringFromResultSet(rs, scriptLanguageField);
|
||||
|
||||
NodeTypeEnum nodeTypeEnum = NodeTypeEnum.getEnumByCode(type);
|
||||
if (Objects.isNull(nodeTypeEnum)) {
|
||||
throw new ELSQLException(StrUtil.format("Invalid type value[{}]", type));
|
||||
}
|
||||
|
||||
if (!nodeTypeEnum.isScript()) {
|
||||
throw new ELSQLException(StrUtil.format("The type value[{}] is not a script type", type));
|
||||
}
|
||||
|
||||
if (!ScriptTypeEnum.checkScriptType(language)) {
|
||||
throw new ELSQLException(StrUtil.format("The language value[{}] is invalid", language));
|
||||
}
|
||||
|
||||
result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),
|
||||
type, language, data));
|
||||
|
||||
//如果需要轮询 计算该scriptData的SHA值
|
||||
if(sqlParserVO.getPollingEnabled()){
|
||||
String scriptKey = StrUtil.join(":", id, type, name, language);
|
||||
String scriptSHA = DigestUtil.sha1Hex(data);
|
||||
scriptSHAMap.put(scriptKey, scriptSHA);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
return StrUtil.format(NODE_XML_PATTERN, CollUtil.join(result, StrUtil.EMPTY));
|
||||
}
|
||||
|
||||
private boolean hasScriptData() {
|
||||
if (StrUtil.isBlank(sqlParserVO.getScriptTableName())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
Connection conn = null;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
String sqlCmd = StrUtil.format(SCRIPT_SQL_CHECK_PATTERN, sqlParserVO.getScriptTableName(),
|
||||
sqlParserVO.getScriptApplicationNameField());
|
||||
try {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
stmt.setFetchSize(1);
|
||||
stmt.setString(1, sqlParserVO.getApplicationName());
|
||||
rs = stmt.executeQuery();
|
||||
return rs.next();
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
}
|
||||
|
||||
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
|
||||
String data = rs.getString(field);
|
||||
if (StrUtil.isBlank(data)) {
|
||||
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
private SQLParserVO getSqlParserVO() {
|
||||
return sqlParserVO;
|
||||
}
|
||||
|
||||
private void setSqlParserVO(SQLParserVO sqlParserVO) {
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
package com.yomahub.liteflow.parser.sql.util;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.crypto.digest.DigestUtil;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 用于轮询script的定时任务
|
||||
*
|
||||
* @author hxinyu
|
||||
* @since 2.11.1
|
||||
*/
|
||||
public class ScriptPollingTask implements Runnable {
|
||||
|
||||
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
|
||||
|
||||
private static final String CONCAT_PATTERN = "CONCAT_WS(':',{},{},{}) as script_concat";
|
||||
|
||||
private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat";
|
||||
|
||||
private static final String SCRIPT_KEY_FIELD = "script_concat";
|
||||
|
||||
private Connection conn;
|
||||
|
||||
private SQLParserVO sqlParserVO;
|
||||
|
||||
private Map<String, String> scriptSHAMap;
|
||||
|
||||
private static final Integer FETCH_SIZE_MAX = 1000;
|
||||
|
||||
LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class);
|
||||
|
||||
public ScriptPollingTask(SQLParserVO sqlParserVO, Map<String, String> scriptSHAMap) {
|
||||
this.sqlParserVO = sqlParserVO;
|
||||
this.scriptSHAMap = scriptSHAMap;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
String scriptTableName = sqlParserVO.getScriptTableName();
|
||||
String scriptIdField = sqlParserVO.getScriptIdField();
|
||||
String scriptDataField = sqlParserVO.getScriptDataField();
|
||||
String scriptNameField = sqlParserVO.getScriptNameField();
|
||||
String scriptTypeField = sqlParserVO.getScriptTypeField();
|
||||
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
|
||||
|
||||
String KeyField;
|
||||
if (StrUtil.isNotBlank(scriptLanguageField)) {
|
||||
KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptTypeField, scriptNameField, scriptLanguageField);
|
||||
} else {
|
||||
KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptTypeField, scriptNameField);
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, scriptDataField, scriptTableName, scriptApplicationNameField);
|
||||
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
// 设置游标拉取数量
|
||||
stmt.setFetchSize(FETCH_SIZE_MAX);
|
||||
stmt.setString(1, applicationName);
|
||||
rs = stmt.executeQuery();
|
||||
|
||||
Set<String> newScriptSet = new HashSet<>();
|
||||
|
||||
while (rs.next()) {
|
||||
String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD);
|
||||
String newData = getStringFromResultSet(rs, scriptDataField);
|
||||
String newSHA = DigestUtil.sha1Hex(newData);
|
||||
newScriptSet.add(scriptKey);
|
||||
//如果封装的SHAMap中不存在该script 表示该script为新增
|
||||
if (!scriptSHAMap.containsKey(scriptKey)) {
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
|
||||
//新增script
|
||||
NodeConvertHelper.changeScriptNode(scriptVO, newData);
|
||||
LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newData);
|
||||
|
||||
//加入到shaMap
|
||||
scriptSHAMap.put(scriptKey, newSHA);
|
||||
}
|
||||
else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) {
|
||||
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
|
||||
//修改script
|
||||
NodeConvertHelper.changeScriptNode(scriptVO, newData);
|
||||
LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newData);
|
||||
|
||||
//修改shaMap
|
||||
scriptSHAMap.put(scriptKey, newSHA);
|
||||
}
|
||||
//SHA值无变化,表示该chain未改变
|
||||
}
|
||||
|
||||
if(scriptSHAMap.size() > newScriptSet.size()) {
|
||||
//如果遍历prepareStatement后修改过的SHAMap数量比最新script总数多, 说明有两种情况:
|
||||
// 1、删除了script
|
||||
// 2、修改了script的id/name/type:因为遍历到新的script_key时会加到SHAMap里,但没有机会删除旧的script
|
||||
// 3、上述两者结合
|
||||
//在此处遍历scriptSHAMap,把不在newScriptSet中的script删除
|
||||
//这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
|
||||
Iterator<String> iterator = scriptSHAMap.keySet().iterator();
|
||||
while(iterator.hasNext()){
|
||||
String scriptKey = iterator.next();
|
||||
if (!newScriptSet.contains(scriptKey)) {
|
||||
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
|
||||
//删除script
|
||||
FlowBus.getNodeMap().remove(scriptVO.getNodeId());
|
||||
LOG.info("starting reload flow config... delete script={}", scriptKey);
|
||||
//修改SHAMap
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOG.error("[Exception during SQL script polling] " + e.getMessage(), e);
|
||||
} finally {
|
||||
// 关闭连接
|
||||
LiteFlowJdbcUtil.close(conn, stmt, rs);
|
||||
}
|
||||
}
|
||||
|
||||
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
|
||||
String data = rs.getString(field);
|
||||
if (StrUtil.isBlank(data)) {
|
||||
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
|
||||
}
|
||||
return data;
|
||||
}
|
||||
}
|
||||
@@ -90,15 +90,26 @@ public class SQLParserVO {
|
||||
*/
|
||||
private String scriptLanguageField;
|
||||
|
||||
/*轮询机制是否开启 默认不开启*/
|
||||
/**
|
||||
* 轮询机制是否开启 默认不开启
|
||||
*/
|
||||
private Boolean pollingEnabled = false;
|
||||
|
||||
/*轮询时间间隔(s) 默认120s*/
|
||||
/**
|
||||
* 轮询时间间隔(s) 默认60s
|
||||
*/
|
||||
private Integer pollingIntervalSeconds = 60;
|
||||
|
||||
/*规则配置后首次轮询的起始时间 默认为60s*/
|
||||
/**
|
||||
* 规则配置后首次轮询的起始时间 默认为60s
|
||||
*/
|
||||
private Integer pollingStartSeconds = 60;
|
||||
|
||||
/**
|
||||
* 是否开启sql日志
|
||||
*/
|
||||
private Boolean sqlLogEnabled = true;
|
||||
|
||||
public String getUrl() {
|
||||
return url;
|
||||
}
|
||||
@@ -257,4 +268,12 @@ public class SQLParserVO {
|
||||
public void setPollingStartSeconds(Integer pollingStartSeconds) {
|
||||
this.pollingStartSeconds = pollingStartSeconds;
|
||||
}
|
||||
|
||||
public Boolean getSqlLogEnabled() {
|
||||
return sqlLogEnabled;
|
||||
}
|
||||
|
||||
public void setSqlLogEnabled(Boolean sqlLogEnabled) {
|
||||
this.sqlLogEnabled = sqlLogEnabled;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user