From f67f9a796106a1b24d1026bd141ff32555ea0226 Mon Sep 17 00:00:00 2001 From: houxinyu Date: Sun, 17 Sep 2023 19:23:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0sql=E6=8F=92=E4=BB=B6?= =?UTF-8?q?=E8=BD=AE=E8=AF=A2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../parser/sql/util/ChainPollingTask.java | 24 +- .../liteflow/parser/sql/util/JDBCHelper.java | 26 +- .../parser/sql/util/ScriptPollingTask.java | 260 ++++++++++++++++++ .../liteflow/parser/sql/vo/SQLParserVO.java | 4 +- 4 files changed, 300 insertions(+), 14 deletions(-) create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java index 7082ee49c..647a7c9de 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java @@ -13,6 +13,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -49,8 +50,6 @@ public class ChainPollingTask implements Runnable { @Override public void run() { try{ - PreparedStatement stmt; - ResultSet rs; String chainTableName = sqlParserVO.getChainTableName(); String elDataField = sqlParserVO.getElDataField(); String chainNameField = sqlParserVO.getChainNameField(); @@ -60,11 +59,11 @@ public class ChainPollingTask implements Runnable { String SHAField = StrUtil.format(SHA_PATTERN, elDataField); String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName, chainApplicationNameField); - stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 设置游标拉取数量 stmt.setFetchSize(FETCH_SIZE_MAX); stmt.setString(1, applicationName); - rs = stmt.executeQuery(); + ResultSet rs = stmt.executeQuery(); Set newChainSet = new HashSet<>(); @@ -81,7 +80,7 @@ public class ChainPollingTask implements Runnable { String newELData = getStringFromResultSet(newChainRS, elDataField); //新增chain LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build(); - LOG.info("starting reload flow config... create key={} new value={},", chainName, newELData); + LOG.info("starting reload flow config... create chain={} new value={},", chainName, newELData); //加入到shaMap chainSHAMap.put(chainName, newSHA); } @@ -94,7 +93,7 @@ public class ChainPollingTask implements Runnable { String newELData = getStringFromResultSet(newChainRS, elDataField); //修改chain LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build(); - LOG.info("starting reload flow config... update key={} new value={},", chainName, newELData); + LOG.info("starting reload flow config... update chain={} new value={},", chainName, newELData); //修改shaMap chainSHAMap.put(chainName, newSHA); } @@ -108,17 +107,20 @@ public class ChainPollingTask implements Runnable { // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain // 3、上述两者结合 //在此处遍历chainSHAMap,把不在newChainSet中的chain删除 - for (String chainName : chainSHAMap.keySet()) { - if(!newChainSet.contains(chainName)){ + //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException + Iterator iterator = chainSHAMap.keySet().iterator(); + while(iterator.hasNext()) { + String chainName = iterator.next(); + if(!newChainSet.contains(chainName)) { FlowBus.removeChain(chainName); LOG.info("starting reload flow config... delete chain={}", chainName); //修改SHAMap - chainSHAMap.remove(chainName); + iterator.remove(); } } } - - }catch (Exception e) { + } + catch (Exception e) { LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e); } } diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java index 250242b8f..6276ec5db 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java @@ -59,9 +59,12 @@ public class JDBCHelper { //定时任务线程池 private static ScheduledThreadPoolExecutor pollExecutor; - //chain的SHA1加密值 用于轮询时确定elDataField是否变化 + //chain的SHA1加密值 用于轮询时确定chain是否变化 private Map chainSHAMap = new HashMap<>(); + //script的SHA1加密值 用于轮询时确定script是否变化 + private Map scriptSHAMap = new HashMap<>(); + /** * 初始化 INSTANCE */ @@ -162,9 +165,16 @@ public class JDBCHelper { * 定时轮询拉取SQL中变化的数据 */ public void listenSQL() { + //添加轮询chain的定时任务 ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap); pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(), sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + if (hasScriptData()) { + //添加轮询script的定时任务 + ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap); + pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(), + sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + } } @@ -220,6 +230,13 @@ public class JDBCHelper { } result.add(StrUtil.format(NODE_ITEM_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, data)); + + //如果需要轮询 计算该scriptData的SHA值 + if(sqlParserVO.getIfPolling()){ + String scriptKey = StrUtil.join(":", id, name, type); + String scriptSHA = DigestUtil.sha1Hex(data); + scriptSHAMap.put(scriptKey, scriptSHA); + } } } catch (Exception e) { throw new ELSQLException(e.getMessage()); @@ -287,6 +304,13 @@ public class JDBCHelper { result.add(StrUtil.format(NODE_ITEM_WITH_LANGUAGE_XML_PATTERN, XmlUtil.escape(id), XmlUtil.escape(name), type, language, data)); + + //如果需要轮询 计算该scriptData的SHA值 + if(sqlParserVO.getIfPolling()){ + String scriptKey = StrUtil.join(":", id, name, type, language); + String scriptSHA = DigestUtil.sha1Hex(data); + scriptSHAMap.put(scriptKey, scriptSHA); + } } } catch (Exception e) { throw new ELSQLException(e.getMessage()); diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java new file mode 100644 index 000000000..5dd40e243 --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ScriptPollingTask.java @@ -0,0 +1,260 @@ +package com.yomahub.liteflow.parser.sql.util; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.ReUtil; +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.LiteFlowNodeBuilder; +import com.yomahub.liteflow.enums.NodeTypeEnum; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.*; + +/** + * 用于轮询script的定时任务 + * + * @author hxinyu + * @since 2.11.1 + */ +public class ScriptPollingTask implements Runnable { + + private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; + + private static final String NEW_SCRIPT_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?"; + + private static final String CONCAT_PATTERN = "CONCAT_WS(':',{},{},{}) as script_concat"; + + private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat"; + + private static final String SHA_PATTERN = "SHA1({})"; + + private static final String SCRIPT_KEY_FIELD = "script_concat"; + + public static Connection conn; + + private SQLParserVO sqlParserVO; + + private Map scriptSHAMap; + + private static final Integer FETCH_SIZE_MAX = 1000; + + LFLog LOG = LFLoggerManager.getLogger(ScriptPollingTask.class); + + public ScriptPollingTask(SQLParserVO sqlParserVO, Map scriptSHAMap) { + this.sqlParserVO = sqlParserVO; + this.scriptSHAMap = scriptSHAMap; + conn = LiteFlowJdbcUtil.getConn(sqlParserVO); + } + + + @Override + public void run() { + try { + String scriptTableName = sqlParserVO.getScriptTableName(); + String scriptIdField = sqlParserVO.getScriptIdField(); + String scriptDataField = sqlParserVO.getScriptDataField(); + String scriptNameField = sqlParserVO.getScriptNameField(); + String scriptTypeField = sqlParserVO.getScriptTypeField(); + String scriptApplicationNameField = sqlParserVO.getScriptApplicationNameField(); + String applicationName = sqlParserVO.getApplicationName(); + String scriptLanguageField = sqlParserVO.getScriptLanguageField(); + + String SHAField = StrUtil.format(SHA_PATTERN, scriptDataField); + String KeyField; + if (StrUtil.isNotBlank(scriptLanguageField)) { + KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptNameField, scriptTypeField, scriptLanguageField); + } else { + KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptNameField, scriptTypeField); + } + + String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, SHAField, scriptTableName, scriptApplicationNameField); + PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + // 设置游标拉取数量 + stmt.setFetchSize(FETCH_SIZE_MAX); + stmt.setString(1, applicationName); + ResultSet rs = stmt.executeQuery(); + + Set newScriptSet = new HashSet<>(); + + while (rs.next()) { + String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD); + String newSHA = getStringFromResultSet(rs, SHAField); + newScriptSet.add(scriptKey); + //如果封装的SHAMap中不存在该script 表示该script为新增 + if (!scriptSHAMap.containsKey(scriptKey)) { + //获取新script内容 + NodeSimpleVO scriptVO = convert(scriptKey); + ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField, + scriptVO.getNodeId(), scriptApplicationNameField, applicationName); + if(newScriptRS.next()) { + String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField); + //新增script + changeScriptNode(scriptVO, newScriptData); + LOG.info("starting reload flow config... create script={} new value={},", scriptKey, newScriptData); + } + //加入到shaMap + scriptSHAMap.put(scriptKey, newSHA); + } + else if (!StrUtil.equals(newSHA, scriptSHAMap.get(scriptKey))) { + //SHA值发生变化,表示该script的值已被修改,重新拉取变化的script + //获取新script内容 + NodeSimpleVO scriptVO = convert(scriptKey); + ResultSet newScriptRS = getNewScriptRS(scriptDataField, scriptTableName, scriptIdField, + scriptVO.getNodeId(), scriptApplicationNameField, applicationName); + if(newScriptRS.next()) { + String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField); + //修改script + changeScriptNode(scriptVO, newScriptData); + LOG.info("starting reload flow config... update scriptId={} new value={},", scriptVO.getNodeId(), newScriptData); + } + //修改shaMap + scriptSHAMap.put(scriptKey, newSHA); + } + //SHA值无变化,表示该chain未改变 + } + + if(scriptSHAMap.size() > newScriptSet.size()) { + //如果遍历prepareStatement后修改过的SHAMap数量比最新script总数多, 说明有两种情况: + // 1、删除了script + // 2、修改了script的id/name/type:因为遍历到新的script_key时会加到SHAMap里,但没有机会删除旧的script + // 3、上述两者结合 + //在此处遍历scriptSHAMap,把不在newScriptSet中的script删除 + //这里用iterator是为避免在遍历集合时删除元素导致ConcurrentModificationException + Iterator iterator = scriptSHAMap.keySet().iterator(); + while(iterator.hasNext()){ + String scriptKey = iterator.next(); + if (!newScriptSet.contains(scriptKey)) { + NodeSimpleVO scriptVO = convert(scriptKey); + //删除script + FlowBus.getNodeMap().remove(scriptVO.getNodeId()); + LOG.info("starting reload flow config... delete script={}", scriptKey); + //修改SHAMap + iterator.remove(); + } + } + } + + } catch (Exception e) { + LOG.error("[Exception during SQL script polling] " + e.getMessage(), e); + } + } + + private ResultSet getNewScriptRS(String scriptDataField, String scriptTableName, String scriptIdField, + String scriptId, String scriptApplicationNameField, String applicationName) { + ResultSet rs = null; + String sqlCmd = StrUtil.format(NEW_SCRIPT_PATTERN, scriptDataField, scriptTableName, + scriptIdField, scriptApplicationNameField); + try{ + PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt.setString(1, scriptId); + stmt.setString(2, applicationName); + rs = stmt.executeQuery(); + }catch (Exception e) { + throw new ELSQLException(e.getMessage()); + } + return rs; + } + + private String getStringFromResultSet(ResultSet rs, String field) throws SQLException { + String data = rs.getString(field); + if (StrUtil.isBlank(data)) { + throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); + } + return data; + } + + /*script节点的修改/添加*/ + private void changeScriptNode(NodeSimpleVO nodeSimpleVO, String newValue) { + // 有语言类型 + if (StrUtil.isNotBlank(nodeSimpleVO.getLanguage())) { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .setLanguage(nodeSimpleVO.getLanguage()) + .build(); + } + // 没有语言类型 + else { + LiteFlowNodeBuilder.createScriptNode() + .setId(nodeSimpleVO.getNodeId()) + .setType(NodeTypeEnum.getEnumByCode(nodeSimpleVO.getType())) + .setName(nodeSimpleVO.getName()) + .setScript(newValue) + .build(); + } + } + + private NodeSimpleVO convert(String scriptKey){ + List matchItemList = ReUtil.findAllGroup0("(?<=[^:]:)[^:]+|[^:]+(?=:[^:])", scriptKey); + if (CollUtil.isEmpty(matchItemList)) { + return null; + } + NodeSimpleVO nodeSimpleVO = new NodeSimpleVO(); + if (matchItemList.size() > 1) { + nodeSimpleVO.setNodeId(matchItemList.get(0)); + nodeSimpleVO.setType(matchItemList.get(1)); + } + + if (matchItemList.size() > 2) { + nodeSimpleVO.setName(matchItemList.get(2)); + } + + if (matchItemList.size() > 3) { + nodeSimpleVO.setLanguage(matchItemList.get(3)); + } + + return nodeSimpleVO; + } + + class NodeSimpleVO { + + private String nodeId; + + private String type; + + private String name = StrUtil.EMPTY; + + private String language; + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getLanguage() { + return language; + } + + public void setLanguage(String language) { + this.language = language; + } + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java index 4246b3e2d..b2f9f3c16 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java @@ -93,8 +93,8 @@ public class SQLParserVO { /*是否开启轮询机制 默认不开启*/ private Boolean ifPolling = false; - /*轮询时间间隔(s) 默认60s*/ - private Integer pollingInterval = 60; + /*轮询时间间隔(s) 默认120s*/ + private Integer pollingInterval = 120; /*规则配置后首次轮询的起始时间 默认为60s*/ private Integer pollingStartTime = 60;