增加sql插件轮询功能

This commit is contained in:
houxinyu
2023-09-17 19:23:57 +08:00
parent d07768d231
commit f67f9a7961
4 changed files with 300 additions and 14 deletions

View File

@@ -13,6 +13,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -49,8 +50,6 @@ public class ChainPollingTask implements Runnable {
@Override
public void run() {
try{
PreparedStatement stmt;
ResultSet rs;
String chainTableName = sqlParserVO.getChainTableName();
String elDataField = sqlParserVO.getElDataField();
String chainNameField = sqlParserVO.getChainNameField();
@@ -60,11 +59,11 @@ public class ChainPollingTask implements Runnable {
String SHAField = StrUtil.format(SHA_PATTERN, elDataField);
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName,
chainApplicationNameField);
stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
rs = stmt.executeQuery();
ResultSet rs = stmt.executeQuery();
Set<String> newChainSet = new HashSet<>();
@@ -81,7 +80,7 @@ public class ChainPollingTask implements Runnable {
String newELData = getStringFromResultSet(newChainRS, elDataField);
//新增chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
LOG.info("starting reload flow config... create key={} new value={},", chainName, newELData);
LOG.info("starting reload flow config... create chain={} new value={},", chainName, newELData);
//加入到shaMap
chainSHAMap.put(chainName, newSHA);
}
@@ -94,7 +93,7 @@ public class ChainPollingTask implements Runnable {
String newELData = getStringFromResultSet(newChainRS, elDataField);
//修改chain
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
LOG.info("starting reload flow config... update key={} new value={},", chainName, newELData);
LOG.info("starting reload flow config... update chain={} new value={},", chainName, newELData);
//修改shaMap
chainSHAMap.put(chainName, newSHA);
}
@@ -108,17 +107,20 @@ public class ChainPollingTask implements Runnable {
// 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain
// 3、上述两者结合
//在此处遍历chainSHAMap,把不在newChainSet中的chain删除
for (String chainName : chainSHAMap.keySet()) {
if(!newChainSet.contains(chainName)){
//这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
Iterator<String> iterator = chainSHAMap.keySet().iterator();
while(iterator.hasNext()) {
String chainName = iterator.next();
if(!newChainSet.contains(chainName)) {
FlowBus.removeChain(chainName);
LOG.info("starting reload flow config... delete chain={}", chainName);
//修改SHAMap
chainSHAMap.remove(chainName);
iterator.remove();
}
}
}
}catch (Exception e) {
}
catch (Exception e) {
LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e);
}
}

View File

@@ -59,9 +59,12 @@ public class JDBCHelper {
//定时任务线程池
private static ScheduledThreadPoolExecutor pollExecutor;
//chain的SHA1加密值 用于轮询时确定elDataField是否变化
//chain的SHA1加密值 用于轮询时确定chain是否变化
private Map<String, String> chainSHAMap = new HashMap<>();
//script的SHA1加密值 用于轮询时确定script是否变化
private Map<String, String> scriptSHAMap = new HashMap<>();
/**
* 初始化 INSTANCE
*/
@@ -162,9 +165,16 @@ public class JDBCHelper {
* 定时轮询拉取SQL中变化的数据
*/
public void listenSQL() {
//添加轮询chain的定时任务
ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap);
pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(),
sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
if (hasScriptData()) {
//添加轮询script的定时任务
ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap);
pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(),
sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
}
}
@@ -220,6 +230,13 @@ public class JDBCHelper {
}
result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data));
//如果需要轮询 计算该scriptData的SHA值
if(sqlParserVO.getIfPolling()){
String scriptKey = StrUtil.join(":", id, name, type);
String scriptSHA = DigestUtil.sha1Hex(data);
scriptSHAMap.put(scriptKey, scriptSHA);
}
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());
@@ -287,6 +304,13 @@ public class JDBCHelper {
result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name),
type, language, data));
//如果需要轮询 计算该scriptData的SHA值
if(sqlParserVO.getIfPolling()){
String scriptKey = StrUtil.join(":", id, name, type, language);
String scriptSHA = DigestUtil.sha1Hex(data);
scriptSHAMap.put(scriptKey, scriptSHA);
}
}
} catch (Exception e) {
throw new ELSQLException(e.getMessage());

View File

@@ -0,0 +1,260 @@
package com.yomahub.liteflow.parser.sql.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.builder.LiteFlowNodeBuilder;
import com.yomahub.liteflow.enums.NodeTypeEnum;
import com.yomahub.liteflow.flow.FlowBus;
import com.yomahub.liteflow.log.LFLog;
import com.yomahub.liteflow.log.LFLoggerManager;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
/**
* 用于轮询script的定时任务
*
* @author hxinyu
* @since 2.11.1
*/
public class ScriptPollingTask implements Runnable {
private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?";
private static final String NEW_SCRIPT_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?";
private static final String CONCAT_PATTERN = "CONCAT_WS(':',{},{},{}) as script_concat";
private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat";
private static final String SHA_PATTERN = "SHA1({})";
private static final String SCRIPT_KEY_FIELD = "script_concat";
public static Connection conn;
private SQLParserVO sqlParserVO;
private Map<String, String> scriptSHAMap;
private static final Integer FETCH_SIZE_MAX = 1000;
LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class);
public ScriptPollingTask(SQLParserVO sqlParserVO, Map<String, String> scriptSHAMap) {
this.sqlParserVO = sqlParserVO;
this.scriptSHAMap = scriptSHAMap;
conn = LiteFlowJdbcUtil.getConn(sqlParserVO);
}
@Override
public void run() {
try {
String scriptTableName = sqlParserVO.getScriptTableName();
String scriptIdField = sqlParserVO.getScriptIdField();
String scriptDataField = sqlParserVO.getScriptDataField();
String scriptNameField = sqlParserVO.getScriptNameField();
String scriptTypeField = sqlParserVO.getScriptTypeField();
String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField();
String applicationName = sqlParserVO.getApplicationName();
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
String SHAField = StrUtil.format(SHA_PATTERN, scriptDataField);
String KeyField;
if (StrUtil.isNotBlank(scriptLanguageField)) {
KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptNameField, scriptTypeField, scriptLanguageField);
} else {
KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptNameField, scriptTypeField);
}
String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, SHAField, scriptTableName, scriptApplicationNameField);
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 设置游标拉取数量
stmt.setFetchSize(FETCH_SIZE_MAX);
stmt.setString(1, applicationName);
ResultSet rs = stmt.executeQuery();
Set<String> newScriptSet = new HashSet<>();
while (rs.next()) {
String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD);
String newSHA = getStringFromResultSet(rs, SHAField);
newScriptSet.add(scriptKey);
//如果封装的SHAMap中不存在该script 表示该script为新增
if (!scriptSHAMap.containsKey(scriptKey)) {
//获取新script内容
NodeSimpleVO scriptVO = convert(scriptKey);
ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField,
scriptVO.getNodeId(), scriptApplicationNameField, applicationName);
if(newScriptRS.next()) {
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
//新增script
changeScriptNode(scriptVO, newScriptData);
LOG.info("starting reload flow config... create script={} new value={},", scriptKey, newScriptData);
}
//加入到shaMap
scriptSHAMap.put(scriptKey, newSHA);
}
else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) {
//SHA值发生变化,表示该script的值已被修改,重新拉取变化的script
//获取新script内容
NodeSimpleVO scriptVO = convert(scriptKey);
ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField,
scriptVO.getNodeId(), scriptApplicationNameField, applicationName);
if(newScriptRS.next()) {
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
//修改script
changeScriptNode(scriptVO, newScriptData);
LOG.info("starting reload flow config... update scriptId={} new value={},", scriptVO.getNodeId(), newScriptData);
}
//修改shaMap
scriptSHAMap.put(scriptKey, newSHA);
}
//SHA值无变化,表示该chain未改变
}
if(scriptSHAMap.size() > newScriptSet.size()) {
//如果遍历prepareStatement后修改过的SHAMap数量比最新script总数多, 说明有两种情况:
// 1、删除了script
// 2、修改了script的id/name/type:因为遍历到新的script_key时会加到SHAMap里,但没有机会删除旧的script
// 3、上述两者结合
//在此处遍历scriptSHAMap,把不在newScriptSet中的script删除
//这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException
Iterator<String> iterator = scriptSHAMap.keySet().iterator();
while(iterator.hasNext()){
String scriptKey = iterator.next();
if (!newScriptSet.contains(scriptKey)) {
NodeSimpleVO scriptVO = convert(scriptKey);
//删除script
FlowBus.getNodeMap().remove(scriptVO.getNodeId());
LOG.info("starting reload flow config... delete script={}", scriptKey);
//修改SHAMap
iterator.remove();
}
}
}
} catch (Exception e) {
LOG.error("[Exception during SQL script polling] " + e.getMessage(), e);
}
}
private ResultSet getNewScriptRS(String scriptDataField, String scriptTableName, String scriptIdField,
String scriptId, String scriptApplicationNameField, String applicationName) {
ResultSet rs = null;
String sqlCmd = StrUtil.format(NEW_SCRIPT_PATTERN, scriptDataField, scriptTableName,
scriptIdField, scriptApplicationNameField);
try{
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
stmt.setString(1, scriptId);
stmt.setString(2, applicationName);
rs = stmt.executeQuery();
}catch (Exception e) {
throw new ELSQLException(e.getMessage());
}
return rs;
}
private String getStringFromResultSet(ResultSet rs, String field) throws SQLException {
String data = rs.getString(field);
if (StrUtil.isBlank(data)) {
throw new ELSQLException(StrUtil.format("exist {} field value is empty", field));
}
return data;
}
/*script节点的修改/添加*/
private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) {
// 有语言类型
if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(newValue)
.setLanguage(nodeSimpleVO.getLanguage())
.build();
}
// 没有语言类型
else {
LiteFlowNodeBuilder.createScriptNode()
.setId(nodeSimpleVO.getNodeId())
.setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType()))
.setName(nodeSimpleVO.getName())
.setScript(newValue)
.build();
}
}
private NodeSimpleVO convert(String scriptKey){
List<String> matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", scriptKey);
if (CollUtil.isEmpty(matchItemList)) {
return null;
}
NodeSimpleVO nodeSimpleVO = new NodeSimpleVO();
if (matchItemList.size() > 1) {
nodeSimpleVO.setNodeId(matchItemList.get(0));
nodeSimpleVO.setType(matchItemList.get(1));
}
if (matchItemList.size() > 2) {
nodeSimpleVO.setName(matchItemList.get(2));
}
if (matchItemList.size() > 3) {
nodeSimpleVO.setLanguage(matchItemList.get(3));
}
return nodeSimpleVO;
}
class NodeSimpleVO {
private String nodeId;
private String type;
private String name = StrUtil.EMPTY;
private String language;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLanguage() {
return language;
}
public void setLanguage(String language) {
this.language = language;
}
}
}

View File

@@ -93,8 +93,8 @@ public class SQLParserVO {
/*是否开启轮询机制 默认不开启*/
private Boolean ifPolling = false;
/*轮询时间间隔(s) 默认60s*/
private Integer pollingInterval = 60;
/*轮询时间间隔(s) 默认120s*/
private Integer pollingInterval = 120;
/*规则配置后首次轮询的起始时间 默认为60s*/
private Integer pollingStartTime = 60;