优化SQL轮询&抽取统一正则转换代码

This commit is contained in:
houxinyu
2023-09-20 20:40:31 +08:00
parent 9ff47e902c
commit 5f6650b5c8
10 changed files with 172 additions and 351 deletions

View File

@@ -1,6 +1,7 @@
package com.yomahub.liteflow.parser.sql.util;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.log.LFLog;
@@ -29,12 +30,6 @@ public class ChainPollingTask implements Runnable {
private static final String NEW_CHAIN_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?";
private static final String SHA_PATTERN = "SHA1({}) AS SHA1";
private static final String SHA_PATTERN_FOR_H2 = "RAWTOHEX(HASH('SHA-1', {})) AS SHA1";
private static final String SHA_FIELD_NAME = "SHA1";
public static Connection conn;
private SQLParserVO sqlParserVO;
@@ -53,6 +48,8 @@ public class ChainPollingTask implements Runnable {
@Override
public void run() {
PreparedStatement stmt = null;
ResultSet rs = null;
try{
String chainTableName = sqlParserVO.getChainTableName();
String elDataField = sqlParserVO.getElDataField();
@@ -60,52 +57,37 @@ public class ChainPollingTask implements Runnable {
String chainApplicationNameField = sqlParserVO.getChainApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String SHAField = StrUtil.format(SHA_PATTERN, elDataField);
//h2数据库计算SHA的函数与MySQL不同
if(StrUtil.equals(sqlParserVO.getDriverClassName(), "org.h2.Driver")){
SHAField = StrUtil.format(SHA_PATTERN_FOR_H2, elDataField);
}
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName,
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, elDataField, chainTableName,
chainApplicationNameField);
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
ResultSet rs = stmt.executeQuery();
rs = stmt.executeQuery();
Set<String> newChainSet = new HashSet<>();
while(rs.next()) {
String chainName = getStringFromResultSet(rs, chainNameField);
String newSHA = getStringFromResultSet(rs, SHA_FIELD_NAME);
String newData = getStringFromResultSet(rs, elDataField);
String newSHA = DigestUtil.sha1Hex(newData);
newChainSet.add(chainName);
//如果封装的SHAMap中不存在该chain, 表示该chain为新增
if(!chainSHAMap.containsKey(chainName)) {
//获取新chain结果
ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
chainApplicationNameField, applicationName, chainName);
if(newChainRS.next()) {
String newELData = getStringFromResultSet(newChainRS, elDataField);
//新增chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newELData);
//加入到shaMap
chainSHAMap.put(chainName, newSHA);
}
//新chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newData);
//加入到shaMap
chainSHAMap.put(chainName, newSHA);
}
else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) {
//SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain
ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField,
chainApplicationNameField, applicationName, chainName);
if(newChainRS.next()) {
String newELData = getStringFromResultSet(newChainRS, elDataField);
//修改chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newELData);
//修改shaMap
chainSHAMap.put(chainName, newSHA);
}
//修改chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newData).build();
LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newData);
//修改shaMap
chainSHAMap.put(chainName, newSHA);
}
//SHA值无变化,表示该chain未改变
}
@@ -128,28 +110,14 @@ public class ChainPollingTask implements Runnable {
}
}
}
}
catch (Exception e) {
} catch (Exception e) {
LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(null, stmt, rs);
}
}
private ResultSet getNewChainRS(String elDataField, String chainTableName, String chainNameField,
String chainApplicationNameField, String applicationName, String chainName) {
ResultSet rs = null;
String sqlCmd = StrUtil.format(NEW_CHAIN_PATTERN, elDataField, chainTableName,
chainNameField, chainApplicationNameField);
try{
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setString(1, chainName);
stmt.setString(2, applicationName);
rs = stmt.executeQuery();
}catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
return rs;
}
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
String data = rs.getString(field);
if (StrUtil.isBlank(data)) {

View File

@@ -60,10 +60,10 @@ public class JDBCHelper {
private static ScheduledThreadPoolExecutor pollExecutor;
//chain的SHA1加密值 用于轮询时确定chain是否变化
private Map<String, String> chainSHAMap = new HashMap<>();
private Map<String/*chainId*/, String/*chain的SHA1加密值*/> chainSHAMap = new HashMap<>();
//script的SHA1加密值 用于轮询时确定script是否变化
private Map<String, String> scriptSHAMap = new HashMap<>();
private Map<String/*id:type:name*/, String/*script的SHA1加密值*/> scriptSHAMap = new HashMap<>();
/**
* 初始化 INSTANCE

View File

@@ -1,13 +1,11 @@
package com.yomahub.liteflow.parser.sql.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import cn.hutool.crypto.digest.DigestUtil;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.helper.NodeConvertHelper;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
@@ -33,12 +31,6 @@ public class ScriptPollingTask implements Runnable {
private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat";
private static final String SHA_PATTERN = "SHA1({}) AS SHA1";
private static final String SHA_PATTERN_FOR_H2 = "RAWTOHEX(HASH('SHA-1', {})) AS SHA1";
private static final String SHA_FIELD_NAME = "SHA1";
private static final String SCRIPT_KEY_FIELD = "script_concat";
public static Connection conn;
@@ -60,6 +52,8 @@ public class ScriptPollingTask implements Runnable {
@Override
public void run() {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
@@ -70,12 +64,6 @@ public class ScriptPollingTask implements Runnable {
String applicationName = sqlParserVO.getApplicationName();
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
String SHAField = StrUtil.format(SHA_PATTERN, scriptDataField);
//h2数据库计算SHA的函数与MySQL不同
if(StrUtil.equals(sqlParserVO.getDriverClassName(), "org.h2.Driver")){
SHAField = StrUtil.format(SHA_PATTERN_FOR_H2, scriptDataField);
}
String KeyField;
if (StrUtil.isNotBlank(scriptLanguageField)) {
KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptTypeField, scriptNameField, scriptLanguageField);
@@ -83,46 +71,37 @@ public class ScriptPollingTask implements Runnable {
KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptTypeField, scriptNameField);
}
String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, SHAField, scriptTableName, scriptApplicationNameField);
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, scriptDataField, scriptTableName, scriptApplicationNameField);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
ResultSet rs = stmt.executeQuery();
rs = stmt.executeQuery();
Set<String> newScriptSet = new HashSet<>();
while (rs.next()) {
String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD);
String newSHA = getStringFromResultSet(rs, SHA_FIELD_NAME);
String newData = getStringFromResultSet(rs, scriptDataField);
String newSHA = DigestUtil.sha1Hex(newData);
newScriptSet.add(scriptKey);
//如果封装的SHAMap中不存在该script 表示该script为新增
if (!scriptSHAMap.containsKey(scriptKey)) {
//获取新script内容
NodeSimpleVO scriptVO = convert(scriptKey);
ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField,
scriptVO.getNodeId(), scriptApplicationNameField, applicationName);
if(newScriptRS.next()) {
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
//新增script
changeScriptNode(scriptVO, newScriptData);
LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newScriptData);
}
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
//新增script
NodeConvertHelper.changeScriptNode(scriptVO, newData);
LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newData);
//加入到shaMap
scriptSHAMap.put(scriptKey, newSHA);
}
else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) {
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
//获取新script内容
NodeSimpleVO scriptVO = convert(scriptKey);
ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField,
scriptVO.getNodeId(), scriptApplicationNameField, applicationName);
if(newScriptRS.next()) {
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
//修改script
changeScriptNode(scriptVO, newScriptData);
LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newScriptData);
}
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
//修改script
NodeConvertHelper.changeScriptNode(scriptVO, newData);
LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newData);
//修改shaMap
scriptSHAMap.put(scriptKey, newSHA);
}
@@ -140,7 +119,7 @@ public class ScriptPollingTask implements Runnable {
while(iterator.hasNext()){
String scriptKey = iterator.next();
if (!newScriptSet.contains(scriptKey)) {
NodeSimpleVO scriptVO = convert(scriptKey);
NodeConvertHelper.NodeSimpleVO scriptVO = NodeConvertHelper.convert(scriptKey);
//删除script
FlowBus.getNodeMap().remove(scriptVO.getNodeId());
LOG.info("starting reload flow config... delete script={}", scriptKey);
@@ -152,25 +131,12 @@ public class ScriptPollingTask implements Runnable {
} catch (Exception e) {
LOG.error("[Exception during SQL script polling] " + e.getMessage(), e);
} finally {
// 关闭连接
LiteFlowJdbcUtil.close(null, stmt, rs);
}
}
private ResultSet getNewScriptRS(String scriptDataField, String scriptTableName, String scriptIdField,
String scriptId, String scriptApplicationNameField, String applicationName) {
ResultSet rs = null;
String sqlCmd = StrUtil.format(NEW_SCRIPT_PATTERN, scriptDataField, scriptTableName,
scriptIdField, scriptApplicationNameField);
try{
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setString(1, scriptId);
stmt.setString(2, applicationName);
rs = stmt.executeQuery();
}catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
return rs;
}
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
String data = rs.getString(field);
if (StrUtil.isBlank(data)) {
@@ -178,92 +144,4 @@ public class ScriptPollingTask implements Runnable {
}
return data;
}
/*script节点的修改/添加*/
private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) {
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(newValue)
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 没有语言类型
else {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(newValue)
.build();
}
}
private NodeSimpleVO convert(String scriptKey){
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", scriptKey);
if (CollUtil.isEmpty(matchItemList)) {
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1) {
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2) {
nodeSimpleVO.setName(matchItemList.get(2));
}
if (matchItemList.size() > 3) {
nodeSimpleVO.setLanguage(matchItemList.get(3));
}
return nodeSimpleVO;
}
class NodeSimpleVO {
private String nodeId;
private String type;
private String name = StrUtil.EMPTY;
private String language;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
}
}