From d07768d2310383eee09277216cdddc7ee3ac6997 Mon Sep 17 00:00:00 2001 From: houxinyu Date: Sat, 16 Sep 2023 22:13:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0SQL=E8=BD=AE=E8=AF=A2chain?= =?UTF-8?q?=E9=83=A8=E5=88=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../liteflow-rule-sql/pom.xml | 6 + .../liteflow/parser/sql/SQLXmlELParser.java | 19 ++- .../parser/sql/util/ChainPollingTask.java | 149 ++++++++++++++++++ .../liteflow/parser/sql/util/JDBCHelper.java | 52 +++++- .../liteflow/parser/sql/vo/SQLParserVO.java | 32 ++++ 5 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java diff --git a/liteflow-rule-plugin/liteflow-rule-sql/pom.xml b/liteflow-rule-plugin/liteflow-rule-sql/pom.xml index e4933af70..f725f7cb8 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/pom.xml +++ b/liteflow-rule-plugin/liteflow-rule-sql/pom.xml @@ -20,5 +20,11 @@ true provided + + + cn.hutool + hutool-crypto + ${hutool-crypto.version} + \ No newline at end of file diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java index ce3bbe536..6f218de4a 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/SQLXmlELParser.java @@ -5,6 +5,7 @@ import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.StrFormatter; import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.core.FlowInitHook; import com.yomahub.liteflow.parser.el.ClassXmlFlowELParser; import com.yomahub.liteflow.parser.sql.exception.ELSQLException; import com.yomahub.liteflow.parser.sql.util.JDBCHelper; @@ -23,6 +24,8 @@ import java.util.Objects; */ public class SQLXmlELParser extends ClassXmlFlowELParser { + private static SQLParserVO sqlParserVO; + private static final String ERROR_MSG_PATTERN = "rule-source-ext-data {} is blank"; private static final String ERROR_COMMON_MSG = "rule-source-ext-data is empty"; @@ -34,7 +37,6 @@ public class SQLXmlELParser extends ClassXmlFlowELParser { LiteflowConfig liteflowConfig = LiteflowConfigGetter.get(); try { - SQLParserVO sqlParserVO = null; if (MapUtil.isNotEmpty((liteflowConfig.getRuleSourceExtDataMap()))) { sqlParserVO = BeanUtil.toBean(liteflowConfig.getRuleSourceExtDataMap(), SQLParserVO.class, CopyOptions.create()); @@ -63,7 +65,20 @@ public class SQLXmlELParser extends ClassXmlFlowELParser { @Override public String parseCustom() { - return JDBCHelper.getInstance().getContent(); + try{ + JDBCHelper jdbcHelper = JDBCHelper.getInstance(); + String content = jdbcHelper.getContent(); + if(sqlParserVO.getIfPolling()) { + FlowInitHook.addHook(() -> { + jdbcHelper.listenSQL(); + return true; + }); + } + return content; + } + catch (Exception ex) { + throw new ELSQLException(ex.getMessage()); + } } /** diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java new file mode 100644 index 000000000..7082ee49c --- /dev/null +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/ChainPollingTask.java @@ -0,0 +1,149 @@ +package com.yomahub.liteflow.parser.sql.util; + +import cn.hutool.core.util.StrUtil; +import com.yomahub.liteflow.builder.el.LiteFlowChainELBuilder; +import com.yomahub.liteflow.flow.FlowBus; +import com.yomahub.liteflow.log.LFLog; +import com.yomahub.liteflow.log.LFLoggerManager; +import com.yomahub.liteflow.parser.sql.exception.ELSQLException; +import com.yomahub.liteflow.parser.sql.vo.SQLParserVO; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * 用于轮询chain的定时任务 + * + * @author hxinyu + * @since 2.11.1 + */ +public class ChainPollingTask implements Runnable { + + private static final String SQL_PATTERN = "SELECT {},{} FROM {} WHERE {}=?"; + + private static final String NEW_CHAIN_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?"; + + private static final String SHA_PATTERN = "SHA1({})"; + + public static Connection conn; + + private SQLParserVO sqlParserVO; + + private Map chainSHAMap; + + private static final Integer FETCH_SIZE_MAX = 1000; + + LFLog LOG = LFLoggerManager.getLogger(ChainPollingTask.class); + + public ChainPollingTask(SQLParserVO sqlParserVO, Map chainSHAMap) { + this.sqlParserVO = sqlParserVO; + this.chainSHAMap = chainSHAMap; + conn = LiteFlowJdbcUtil.getConn(sqlParserVO); + } + + @Override + public void run() { + try{ + PreparedStatement stmt; + ResultSet rs; + String chainTableName = sqlParserVO.getChainTableName(); + String elDataField = sqlParserVO.getElDataField(); + String chainNameField = sqlParserVO.getChainNameField(); + String chainApplicationNameField = sqlParserVO.getChainApplicationNameField(); + String applicationName = sqlParserVO.getApplicationName(); + + String SHAField = StrUtil.format(SHA_PATTERN, elDataField); + String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName, + chainApplicationNameField); + stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + // 设置游标拉取数量 + stmt.setFetchSize(FETCH_SIZE_MAX); + stmt.setString(1, applicationName); + rs = stmt.executeQuery(); + + Set newChainSet = new HashSet<>(); + + while(rs.next()) { + String chainName = getStringFromResultSet(rs, chainNameField); + String newSHA = getStringFromResultSet(rs, SHAField); + newChainSet.add(chainName); + //如果封装的SHAMap中不存在该chain, 表示该chain为新增 + if(!chainSHAMap.containsKey(chainName)) { + //获取新chain结果 + ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField, + chainApplicationNameField, applicationName, chainName); + if(newChainRS.next()) { + String newELData = getStringFromResultSet(newChainRS, elDataField); + //新增chain + LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build(); + LOG.info("starting reload flow config... create key={} new value={},", chainName, newELData); + //加入到shaMap + chainSHAMap.put(chainName, newSHA); + } + } + else if (!StrUtil.equals(newSHA, chainSHAMap.get(chainName))) { + //SHA值发生变化,表示该chain的值已被修改,重新拉取变化的chain + ResultSet newChainRS = getNewChainRS(elDataField, chainTableName, chainNameField, + chainApplicationNameField, applicationName, chainName); + if(newChainRS.next()) { + String newELData = getStringFromResultSet(newChainRS, elDataField); + //修改chain + LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build(); + LOG.info("starting reload flow config... update key={} new value={},", chainName, newELData); + //修改shaMap + chainSHAMap.put(chainName, newSHA); + } + } + //SHA值无变化,表示该chain未改变 + } + + if(chainSHAMap.size() > newChainSet.size()) { + //如果遍历prepareStatement后修改过的SHAMap数量比最新chain总数多, 说明有两种情况: + // 1、删除了chain + // 2、修改了chainName:因为遍历到新的name时会加到SHAMap里,但没有机会删除旧的chain + // 3、上述两者结合 + //在此处遍历chainSHAMap,把不在newChainSet中的chain删除 + for (String chainName : chainSHAMap.keySet()) { + if(!newChainSet.contains(chainName)){ + FlowBus.removeChain(chainName); + LOG.info("starting reload flow config... delete chain={}", chainName); + //修改SHAMap + chainSHAMap.remove(chainName); + } + } + } + + }catch (Exception e) { + LOG.error("[Exception during SQL chain polling] " + e.getMessage(), e); + } + } + + private ResultSet getNewChainRS(String elDataField, String chainTableName, String chainNameField, + String chainApplicationNameField, String applicationName, String chainName) { + ResultSet rs = null; + String sqlCmd = StrUtil.format(NEW_CHAIN_PATTERN, elDataField, chainTableName, + chainNameField, chainApplicationNameField); + try{ + PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt.setString(1, chainName); + stmt.setString(2, applicationName); + rs = stmt.executeQuery(); + }catch (Exception e) { + throw new ELSQLException(e.getMessage()); + } + return rs; + } + + private String getStringFromResultSet(ResultSet rs, String field) throws SQLException { + String data = rs.getString(field); + if (StrUtil.isBlank(data)) { + throw new ELSQLException(StrUtil.format("exist {} field value is empty", field)); + } + return data; + } +} diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java index 58d374b6e..250242b8f 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/util/JDBCHelper.java @@ -1,8 +1,11 @@ package com.yomahub.liteflow.parser.sql.util; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.thread.NamedThreadFactory; +import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.XmlUtil; +import cn.hutool.crypto.digest.DigestUtil; import com.yomahub.liteflow.enums.NodeTypeEnum; import com.yomahub.liteflow.enums.ScriptTypeEnum; import com.yomahub.liteflow.parser.sql.exception.ELSQLException; @@ -12,9 +15,11 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; +import java.util.*; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * jdbc 工具类 @@ -48,6 +53,15 @@ public class JDBCHelper { private static JDBCHelper INSTANCE; + //定时任务线程池核心线程数 + private static final int CORE_POOL_SIZE = 2; + + //定时任务线程池 + private static ScheduledThreadPoolExecutor pollExecutor; + + //chain的SHA1加密值 用于轮询时确定elDataField是否变化 + private Map chainSHAMap = new HashMap<>(); + /** * 初始化 INSTANCE */ @@ -58,6 +72,15 @@ public class JDBCHelper { Class.forName(sqlParserVO.getDriverClassName()); } INSTANCE.setSqlParserVO(sqlParserVO); + //创建定时任务线程池 + if (sqlParserVO.getIfPolling() && ObjectUtil.isNull(getPollExecutor())) { + ThreadFactory namedThreadFactory = new NamedThreadFactory("SQL-Polling-", false); + ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor( + CORE_POOL_SIZE, + namedThreadFactory, + new ThreadPoolExecutor.DiscardOldestPolicy()); + setPollExecutor(threadPoolExecutor); + } } catch (ClassNotFoundException e) { throw new ELSQLException(e.getMessage()); } @@ -109,6 +132,12 @@ public class JDBCHelper { String chainName = getStringFromResultSet(rs, chainNameField); result.add(StrUtil.format(CHAIN_XML_PATTERN, XmlUtil.escape(chainName), elData)); + + //如果需要轮询 计算该chainData的SHA值 + if(sqlParserVO.getIfPolling()){ + String chainSHA = DigestUtil.sha1Hex(elData); + chainSHAMap.put(chainName, chainSHA); + } } } catch (Exception e) { throw new ELSQLException(e.getMessage()); @@ -129,6 +158,16 @@ public class JDBCHelper { return StrUtil.format(XML_PATTERN, nodesContent, chainsContent); } + /** + * 定时轮询拉取SQL中变化的数据 + */ + public void listenSQL() { + ChainPollingTask chainTask = new ChainPollingTask(sqlParserVO, chainSHAMap); + pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(), + sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS); + } + + /** * 获取脚本节点内容 */ @@ -299,4 +338,11 @@ public class JDBCHelper { this.sqlParserVO = sqlParserVO; } + public static ScheduledThreadPoolExecutor getPollExecutor() { + return pollExecutor; + } + + public static void setPollExecutor(ScheduledThreadPoolExecutor pollExecutor) { + JDBCHelper.pollExecutor = pollExecutor; + } } diff --git a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java index b549bafdc..4246b3e2d 100644 --- a/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java +++ b/liteflow-rule-plugin/liteflow-rule-sql/src/main/java/com/yomahub/liteflow/parser/sql/vo/SQLParserVO.java @@ -90,6 +90,15 @@ public class SQLParserVO { */ private String scriptLanguageField; + /*是否开启轮询机制 默认不开启*/ + private Boolean ifPolling = false; + + /*轮询时间间隔(s) 默认60s*/ + private Integer pollingInterval = 60; + + /*规则配置后首次轮询的起始时间 默认为60s*/ + private Integer pollingStartTime = 60; + public String getUrl() { return url; } @@ -225,4 +234,27 @@ public class SQLParserVO { return StrUtil.isBlank(url) && StrUtil.isBlank(username) && StrUtil.isBlank(password) && StrUtil.isBlank(driverClassName); } + public Boolean getIfPolling() { + return ifPolling; + } + + public void setIfPolling(Boolean ifPolling) { + this.ifPolling = ifPolling; + } + + public Integer getPollingInterval() { + return pollingInterval; + } + + public void setPollingInterval(Integer pollingInterval) { + this.pollingInterval = pollingInterval; + } + + public Integer getPollingStartTime() { + return pollingStartTime; + } + + public void setPollingStartTime(Integer pollingStartTime) { + this.pollingStartTime = pollingStartTime; + } }