mirror of
https://gitee.com/dromara/liteFlow.git
synced 2026-06-10 03:07:32 +08:00
增加SQL轮询测试用例
This commit is contained in:
@@ -29,7 +29,11 @@ public class ChainPollingTask implements Runnable {
|
||||
|
||||
private static final String NEW_CHAIN_PATTERN = "SELECT {} FROM {} WHERE {}=? AND {}=?";
|
||||
|
||||
private static final String SHA_PATTERN = "SHA1({})";
|
||||
private static final String SHA_PATTERN = "SHA1({}) AS SHA1";
|
||||
|
||||
private static final String SHA_PATTERN_FOR_H2 = "RAWTOHEX(HASH('SHA-1', {})) AS SHA1";
|
||||
|
||||
private static final String SHA_FIELD_NAME = "SHA1";
|
||||
|
||||
public static Connection conn;
|
||||
|
||||
@@ -57,6 +61,11 @@ public class ChainPollingTask implements Runnable {
|
||||
String applicationName = sqlParserVO.getApplicationName();
|
||||
|
||||
String SHAField = StrUtil.format(SHA_PATTERN, elDataField);
|
||||
//h2数据库计算SHA的函数与MySQL不同
|
||||
if(StrUtil.equals(sqlParserVO.getDriverClassName(), "org.h2.Driver")){
|
||||
SHAField = StrUtil.format(SHA_PATTERN_FOR_H2, elDataField);
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SQL_PATTERN, chainNameField, SHAField, chainTableName,
|
||||
chainApplicationNameField);
|
||||
PreparedStatement stmt = conn.prepareStatement(sqlCmd, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
@@ -69,7 +78,7 @@ public class ChainPollingTask implements Runnable {
|
||||
|
||||
while(rs.next()) {
|
||||
String chainName = getStringFromResultSet(rs, chainNameField);
|
||||
String newSHA = getStringFromResultSet(rs, SHAField);
|
||||
String newSHA = getStringFromResultSet(rs, SHA_FIELD_NAME);
|
||||
newChainSet.add(chainName);
|
||||
//如果封装的SHAMap中不存在该chain, 表示该chain为新增
|
||||
if(!chainSHAMap.containsKey(chainName)) {
|
||||
@@ -80,7 +89,7 @@ public class ChainPollingTask implements Runnable {
|
||||
String newELData = getStringFromResultSet(newChainRS, elDataField);
|
||||
//新增chain
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
|
||||
LOG.info("starting reload flow config... create chain={} new value={},", chainName, newELData);
|
||||
LOG.info("starting reload flow config... create chain={}, new value={},", chainName, newELData);
|
||||
//加入到shaMap
|
||||
chainSHAMap.put(chainName, newSHA);
|
||||
}
|
||||
@@ -93,7 +102,7 @@ public class ChainPollingTask implements Runnable {
|
||||
String newELData = getStringFromResultSet(newChainRS, elDataField);
|
||||
//修改chain
|
||||
LiteFlowChainELBuilder.createChain().setChainId(chainName).setEL(newELData).build();
|
||||
LOG.info("starting reload flow config... update chain={} new value={},", chainName, newELData);
|
||||
LOG.info("starting reload flow config... update chain={}, new value={},", chainName, newELData);
|
||||
//修改shaMap
|
||||
chainSHAMap.put(chainName, newSHA);
|
||||
}
|
||||
|
||||
@@ -172,7 +172,7 @@ public class JDBCHelper {
|
||||
if (hasScriptData()) {
|
||||
//添加轮询script的定时任务
|
||||
ScriptPollingTask scriptTask = new ScriptPollingTask(sqlParserVO, scriptSHAMap);
|
||||
pollExecutor.scheduleAtFixedRate(chainTask, sqlParserVO.getPollingStartTime().longValue(),
|
||||
pollExecutor.scheduleAtFixedRate(scriptTask, sqlParserVO.getPollingStartTime().longValue(),
|
||||
sqlParserVO.getPollingInterval().longValue(), TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
@@ -233,7 +233,7 @@ public class JDBCHelper {
|
||||
|
||||
//如果需要轮询 计算该scriptData的SHA值
|
||||
if(sqlParserVO.getIfPolling()){
|
||||
String scriptKey = StrUtil.join(":", id, name, type);
|
||||
String scriptKey = StrUtil.join(":", id, type, name);
|
||||
String scriptSHA = DigestUtil.sha1Hex(data);
|
||||
scriptSHAMap.put(scriptKey, scriptSHA);
|
||||
}
|
||||
@@ -307,7 +307,7 @@ public class JDBCHelper {
|
||||
|
||||
//如果需要轮询 计算该scriptData的SHA值
|
||||
if(sqlParserVO.getIfPolling()){
|
||||
String scriptKey = StrUtil.join(":", id, name, type, language);
|
||||
String scriptKey = StrUtil.join(":", id, type, name, language);
|
||||
String scriptSHA = DigestUtil.sha1Hex(data);
|
||||
scriptSHAMap.put(scriptKey, scriptSHA);
|
||||
}
|
||||
|
||||
@@ -33,7 +33,11 @@ public class ScriptPollingTask implements Runnable {
|
||||
|
||||
private static final String CONCAT_WITH_LANGUAGE_PATTERN = "CONCAT_WS(':',{},{},{},{}) as script_concat";
|
||||
|
||||
private static final String SHA_PATTERN = "SHA1({})";
|
||||
private static final String SHA_PATTERN = "SHA1({}) AS SHA1";
|
||||
|
||||
private static final String SHA_PATTERN_FOR_H2 = "RAWTOHEX(HASH('SHA-1', {})) AS SHA1";
|
||||
|
||||
private static final String SHA_FIELD_NAME = "SHA1";
|
||||
|
||||
private static final String SCRIPT_KEY_FIELD = "script_concat";
|
||||
|
||||
@@ -67,11 +71,16 @@ public class ScriptPollingTask implements Runnable {
|
||||
String scriptLanguageField = sqlParserVO.getScriptLanguageField();
|
||||
|
||||
String SHAField = StrUtil.format(SHA_PATTERN, scriptDataField);
|
||||
//h2数据库计算SHA的函数与MySQL不同
|
||||
if(StrUtil.equals(sqlParserVO.getDriverClassName(), "org.h2.Driver")){
|
||||
SHAField = StrUtil.format(SHA_PATTERN_FOR_H2, scriptDataField);
|
||||
}
|
||||
|
||||
String KeyField;
|
||||
if (StrUtil.isNotBlank(scriptLanguageField)) {
|
||||
KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptNameField, scriptTypeField, scriptLanguageField);
|
||||
KeyField = StrUtil.format(CONCAT_WITH_LANGUAGE_PATTERN, scriptIdField, scriptTypeField, scriptNameField, scriptLanguageField);
|
||||
} else {
|
||||
KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptNameField, scriptTypeField);
|
||||
KeyField = StrUtil.format(CONCAT_PATTERN, scriptIdField, scriptTypeField, scriptNameField);
|
||||
}
|
||||
|
||||
String sqlCmd = StrUtil.format(SQL_PATTERN, KeyField, SHAField, scriptTableName, scriptApplicationNameField);
|
||||
@@ -85,7 +94,7 @@ public class ScriptPollingTask implements Runnable {
|
||||
|
||||
while (rs.next()) {
|
||||
String scriptKey = getStringFromResultSet(rs, SCRIPT_KEY_FIELD);
|
||||
String newSHA = getStringFromResultSet(rs, SHAField);
|
||||
String newSHA = getStringFromResultSet(rs, SHA_FIELD_NAME);
|
||||
newScriptSet.add(scriptKey);
|
||||
//如果封装的SHAMap中不存在该script 表示该script为新增
|
||||
if (!scriptSHAMap.containsKey(scriptKey)) {
|
||||
@@ -97,7 +106,7 @@ public class ScriptPollingTask implements Runnable {
|
||||
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
|
||||
//新增script
|
||||
changeScriptNode(scriptVO, newScriptData);
|
||||
LOG.info("starting reload flow config... create script={} new value={},", scriptKey, newScriptData);
|
||||
LOG.info("starting reload flow config... create script={}, new value={},", scriptKey, newScriptData);
|
||||
}
|
||||
//加入到shaMap
|
||||
scriptSHAMap.put(scriptKey, newSHA);
|
||||
@@ -112,7 +121,7 @@ public class ScriptPollingTask implements Runnable {
|
||||
String newScriptData = getStringFromResultSet(newScriptRS, scriptDataField);
|
||||
//修改script
|
||||
changeScriptNode(scriptVO, newScriptData);
|
||||
LOG.info("starting reload flow config... update scriptId={} new value={},", scriptVO.getNodeId(), newScriptData);
|
||||
LOG.info("starting reload flow config... update scriptId={}, new value={},", scriptVO.getNodeId(), newScriptData);
|
||||
}
|
||||
//修改shaMap
|
||||
scriptSHAMap.put(scriptKey, newSHA);
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
package com.yomahub.liteflow.test.sql;
|
||||
|
||||
import com.yomahub.liteflow.core.FlowExecutor;
|
||||
import com.yomahub.liteflow.core.FlowInitHook;
|
||||
import com.yomahub.liteflow.flow.FlowBus;
|
||||
import com.yomahub.liteflow.flow.LiteflowResponse;
|
||||
import com.yomahub.liteflow.log.LFLog;
|
||||
import com.yomahub.liteflow.log.LFLoggerManager;
|
||||
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
|
||||
import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
|
||||
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
|
||||
import com.yomahub.liteflow.property.LiteflowConfig;
|
||||
import com.yomahub.liteflow.property.LiteflowConfigGetter;
|
||||
import com.yomahub.liteflow.slot.DefaultContext;
|
||||
import com.yomahub.liteflow.spi.holder.SpiFactoryCleaner;
|
||||
import com.yomahub.liteflow.spring.ComponentScanner;
|
||||
import com.yomahub.liteflow.test.BaseTest;
|
||||
import com.yomahub.liteflow.thread.ExecutorHelper;
|
||||
import com.yomahub.liteflow.util.JsonUtil;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringExtension;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.lang.reflect.Field;
|
||||
import java.sql.*;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* @author hxinyu
|
||||
* @since 2.11.1
|
||||
*/
|
||||
@ExtendWith(SpringExtension.class)
|
||||
@TestPropertySource(value = "classpath:/application-poll-xml.properties")
|
||||
@SpringBootTest(classes = SQLWithXmlELSpringbootPollingTest.class)
|
||||
@EnableAutoConfiguration
|
||||
@ComponentScan({ "com.yomahub.liteflow.test.sql.cmp" })
|
||||
public class SQLWithXmlELSpringbootPollingTest extends BaseTest {
|
||||
|
||||
@Resource
|
||||
private FlowExecutor flowExecutor;
|
||||
|
||||
static LFLog LOG = LFLoggerManager.getLogger(SQLWithXmlELSpringbootPollingTest.class);
|
||||
|
||||
@AfterAll
|
||||
public static void after(){
|
||||
try{
|
||||
//关闭定时轮询线程池
|
||||
Field pollExecutor = JDBCHelper.class.getDeclaredField("pollExecutor");
|
||||
pollExecutor.setAccessible(true);
|
||||
ScheduledThreadPoolExecutor threadPoolExecutor = (ScheduledThreadPoolExecutor) pollExecutor.get(null);
|
||||
threadPoolExecutor.shutdownNow();
|
||||
LOG.info("[SQL Polling thread pool closed]");
|
||||
}catch (Exception ignored) {
|
||||
LOG.error("[SQL Polling thread pool not closed]", ignored);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSQLWithXml() throws InterruptedException {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "arg");
|
||||
Assertions.assertEquals("a==>b==>c", response.getExecuteStepStr());
|
||||
|
||||
// 修改chain
|
||||
changeData();
|
||||
Thread.sleep(4000);
|
||||
Assertions.assertEquals("a==>c==>b", flowExecutor.execute2Resp("chain1", "arg").getExecuteStepStr());
|
||||
|
||||
// 新增chain
|
||||
insertData();
|
||||
Thread.sleep(4000);
|
||||
Assertions.assertEquals("a==>b", flowExecutor.execute2Resp("chain5", "arg").getExecuteStepStr());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSQLWithScriptXml() throws InterruptedException {
|
||||
LiteflowResponse response = flowExecutor.execute2Resp("chain2", "arg");
|
||||
Assertions.assertTrue(response.isSuccess());
|
||||
Assertions.assertEquals("x1[if 脚本]==>a==>b", response.getExecuteStepStrWithoutTime());
|
||||
|
||||
// 修改script
|
||||
changeScriptData();
|
||||
Thread.sleep(4000);
|
||||
Assertions.assertEquals("x1[if 脚本]", flowExecutor.execute2Resp("chain2", "arg").getExecuteStepStr());
|
||||
|
||||
// 新増script
|
||||
insertScriptData();
|
||||
Thread.sleep(4000);
|
||||
response = flowExecutor.execute2Resp("chain6", "arg");
|
||||
DefaultContext context = response.getFirstContextBean();
|
||||
Assertions.assertEquals("a==>x3[x3脚本]", response.getExecuteStepStrWithoutTime());
|
||||
Assertions.assertEquals("hello", context.getData("test"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改chain数据
|
||||
*/
|
||||
private void changeData() {
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
SQLParserVO sqlParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), SQLParserVO.class);
|
||||
Connection connection;
|
||||
try {
|
||||
connection = DriverManager.getConnection(sqlParserVO.getUrl(), sqlParserVO.getUsername(),
|
||||
sqlParserVO.getPassword());
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("UPDATE EL_TABLE SET EL_DATA='THEN(a, c, b);' WHERE chain_name='chain1'");
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加chain数据
|
||||
*/
|
||||
private void insertData() {
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
SQLParserVO sqlParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), SQLParserVO.class);
|
||||
Connection connection;
|
||||
try {
|
||||
connection = DriverManager.getConnection(sqlParserVO.getUrl(), sqlParserVO.getUsername(),
|
||||
sqlParserVO.getPassword());
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate("INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain5','THEN(a, b);')");
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改脚本数据
|
||||
*/
|
||||
private void changeScriptData() {
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
SQLParserVO sqlParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), SQLParserVO.class);
|
||||
Connection connection;
|
||||
try {
|
||||
connection = DriverManager.getConnection(sqlParserVO.getUrl(), sqlParserVO.getUsername(),
|
||||
sqlParserVO.getPassword());
|
||||
Statement statement = connection.createStatement();
|
||||
//修改script data
|
||||
statement.executeUpdate(
|
||||
"UPDATE SCRIPT_NODE_TABLE SET SCRIPT_NODE_DATA='return false;' WHERE SCRIPT_NODE_ID='x1'");
|
||||
//修改script名
|
||||
statement.executeUpdate(
|
||||
"UPDATE SCRIPT_NODE_TABLE SET SCRIPT_NODE_NAME='x0_script' WHERE SCRIPT_NODE_ID='x0'");
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 增加脚本数据
|
||||
*/
|
||||
private void insertScriptData() {
|
||||
LiteflowConfig liteflowConfig = LiteflowConfigGetter.get();
|
||||
SQLParserVO sqlParserVO = JsonUtil.parseObject(liteflowConfig.getRuleSourceExtData(), SQLParserVO.class);
|
||||
Connection connection;
|
||||
try {
|
||||
connection = DriverManager.getConnection(sqlParserVO.getUrl(), sqlParserVO.getUsername(),
|
||||
sqlParserVO.getPassword());
|
||||
Statement statement = connection.createStatement();
|
||||
statement.executeUpdate(
|
||||
"INSERT INTO SCRIPT_NODE_TABLE (APPLICATION_NAME,SCRIPT_NODE_ID,SCRIPT_NODE_NAME,SCRIPT_NODE_TYPE,SCRIPT_NODE_DATA,SCRIPT_LANGUAGE) values ('demo','x3','x3脚本','script','defaultContext.setData(\"test\",\"hello\");','groovy');");
|
||||
statement.executeUpdate(
|
||||
"INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain6','THEN(a, x3);');");
|
||||
}
|
||||
catch (SQLException e) {
|
||||
throw new ELSQLException(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
liteflow.rule-source-ext-data={\
|
||||
"url":"jdbc:h2:mem:test_db;MODE=MySQL",\
|
||||
"driverClassName":"org.h2.Driver",\
|
||||
"username":"root",\
|
||||
"password":"123456",\
|
||||
"applicationName":"demo",\
|
||||
"chainTableName":"EL_TABLE",\
|
||||
"chainApplicationNameField":"application_name",\
|
||||
"chainNameField":"chain_name",\
|
||||
"elDataField":"EL_DATA",\
|
||||
"scriptTableName":"script_node_table",\
|
||||
"scriptApplicationNameField":"application_name",\
|
||||
"scriptIdField":"script_node_id",\
|
||||
"scriptNameField":"script_node_name",\
|
||||
"scriptDataField":"script_node_data",\
|
||||
"scriptLanguageField":"script_language",\
|
||||
"scriptTypeField":"script_node_type",\
|
||||
"ifPolling":true,\
|
||||
"pollingInterval":2,\
|
||||
"pollingStartTime":2\
|
||||
}
|
||||
|
||||
spring.datasource.driver-class-name=org.h2.Driver
|
||||
spring.datasource.url=jdbc:h2:mem:test_db;MODE=MySQL
|
||||
spring.datasource.username=root
|
||||
spring.datasource.password=123456
|
||||
spring.datasource.schema=classpath:/sql/schema.sql
|
||||
spring.datasource.data=classpath:/sql/data.sql
|
||||
spring.datasource.platform=h2
|
||||
@@ -0,0 +1,14 @@
|
||||
DELETE FROM EL_TABLE;
|
||||
|
||||
INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain1','THEN(a, b, c);');
|
||||
INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain2','IF(x1, THEN(a, b));');
|
||||
INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain3','IF(x0, THEN(a, b));');
|
||||
INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','<chain3>','IF(x0, THEN(a, b));');
|
||||
INSERT INTO EL_TABLE (APPLICATION_NAME,CHAIN_NAME,EL_DATA) values ('demo','chain4','IF(x2, IF(x0, THEN(a, b)));');
|
||||
|
||||
DELETE FROM SCRIPT_NODE_TABLE;
|
||||
|
||||
INSERT INTO SCRIPT_NODE_TABLE (APPLICATION_NAME,SCRIPT_NODE_ID,SCRIPT_NODE_NAME,SCRIPT_NODE_TYPE,SCRIPT_NODE_DATA,SCRIPT_LANGUAGE) values ('demo','x0','if 脚本','if_script','return true','groovy');
|
||||
INSERT INTO SCRIPT_NODE_TABLE (APPLICATION_NAME,SCRIPT_NODE_ID,SCRIPT_NODE_NAME,SCRIPT_NODE_TYPE,SCRIPT_NODE_DATA,SCRIPT_LANGUAGE) values ('demo','x1','if 脚本','if_script','return true','groovy');
|
||||
|
||||
INSERT INTO SCRIPT_NODE_TABLE (APPLICATION_NAME,SCRIPT_NODE_ID,SCRIPT_NODE_NAME,SCRIPT_NODE_TYPE,SCRIPT_NODE_DATA,SCRIPT_LANGUAGE) values ('demo','x2','python脚本','if_script','return true','js');
|
||||
@@ -0,0 +1,20 @@
|
||||
create table IF NOT EXISTS `EL_TABLE`
|
||||
(
|
||||
`id` bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
|
||||
`application_name` varchar(32) NOT NULL,
|
||||
`chain_name` varchar(32) NOT NULL,
|
||||
`el_data` varchar(1024) NOT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
);
|
||||
|
||||
create table IF NOT EXISTS `script_node_table`
|
||||
(
|
||||
`id` bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,
|
||||
`application_name` varchar(32) NOT NULL,
|
||||
`script_node_id` varchar(32) NOT NULL,
|
||||
`script_node_name` varchar(32) NOT NULL,
|
||||
`script_node_type` varchar(32) NOT NULL,
|
||||
`script_node_data` varchar(1024) NOT NULL,
|
||||
`script_language` varchar(1024) NOT NULL,
|
||||
PRIMARY KEY (`id`)
|
||||
);
|
||||
Reference in New Issue
Block a user