#IB0SJ1 调整逻辑

This commit is contained in:
jay li
2024-12-04 22:46:59 +08:00
parent c275464675
commit 882d3d1491
10 changed files with 139 additions and 85 deletions

View File

@@ -0,0 +1,51 @@
package com.yomahub.liteflow.flow.entity;
/**
* sInstanceId
*
* @author jay li
* @since 2.13.0
*/
public class InstanceIdDto {
// a_XXX_0
// {"chainId":"chain1","nodeId":"a","instanceId":"XXXX","index":0},
private String chainId;
private String nodeId;
private String instanceId;
private Integer index;
public String getChainId() {
return chainId;
}
public void setChainId(String chainId) {
this.chainId = chainId;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getInstanceId() {
return instanceId;
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public Integer getIndex() {
return index;
}
public void setIndex(Integer index) {
this.index = index;
}
}

View File

@@ -2,17 +2,15 @@ package com.yomahub.liteflow.flow.instanceId;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.crypto.digest.MD5;
import com.fasterxml.jackson.databind.JsonNode;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.element.Node;
import com.yomahub.liteflow.flow.entity.InstanceIdDto;
import org.apache.commons.lang.StringUtils;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static com.yomahub.liteflow.util.JsonUtil.parseObject;
import static com.yomahub.liteflow.util.JsonUtil.toJsonString;
import static com.yomahub.liteflow.util.JsonUtil.*;
import static com.yomahub.liteflow.util.SerialsUtil.generateShortUUID;
/**
@@ -31,25 +29,14 @@ public abstract class BaseNodeInstanceIdManageSpi implements NodeInstanceIdManag
List<String> instanceIdFile = readInstanceIdFile(chainId);
for (int i = 1; i < instanceIdFile.size(); i++) {
JsonNode groupKeyAndInstanceIds = parseObject(instanceIdFile.get(i));
if (groupKeyAndInstanceIds == null) {
List<InstanceIdDto> instanceIdDtos = parseList(instanceIdFile.get(i), InstanceIdDto.class);
if (instanceIdDtos == null) {
continue;
}
Iterator<String> fieldNames = groupKeyAndInstanceIds.fieldNames();
while (fieldNames.hasNext()) {
String key = fieldNames.next();
JsonNode valueNode = groupKeyAndInstanceIds.get(key);
if (valueNode.isArray()) {
Map<String, Integer> map = new HashMap<>();
for (int j = 1; j < valueNode.size(); j+=2) {
String nodeId = valueNode.get(j - 1).asText();
map.put(nodeId, map.getOrDefault(nodeId, -1)+1);
if (instanceId.equals(valueNode.get(j).asText())) {
return nodeId + "(" + (map.get(nodeId)) + ")";
}
}
for (InstanceIdDto dto : instanceIdDtos) {
if (Objects.equals(dto.getInstanceId(), instanceId)) {
return dto.getNodeId() + "(" + dto.getIndex() + ")";
}
}
}
@@ -67,37 +54,33 @@ public abstract class BaseNodeInstanceIdManageSpi implements NodeInstanceIdManag
// 如果文件不存在或者文件内容不是当前el则写入
if (CollUtil.isEmpty(instanceIdFile) || !instanceIdFile.get(0).equals(elMd5)) {
nodeInstanceIdManageSpi.writeInstanceIdFile(writeNodeInstanceId(condition, elMd5), chain.getChainId());
nodeInstanceIdManageSpi.writeInstanceIdFile(writeNodeInstanceId(condition), elMd5, chain.getChainId());
} else {
// 文件存在,则直接读取
Map<String, List<String>> executableMap = new HashMap<>();
List<InstanceIdDto> instanceIdDtos = new ArrayList<>();
for (int i = 1; i < instanceIdFile.size(); i++) {
JsonNode groupKeyAndInstanceIds = parseObject(instanceIdFile.get(i));
if (groupKeyAndInstanceIds == null) {
continue;
}
Iterator<String> fieldNames = groupKeyAndInstanceIds.fieldNames();
while (fieldNames.hasNext()) {
String key = fieldNames.next();
JsonNode valueNode = groupKeyAndInstanceIds.get(key);
if (valueNode.isArray()) {
List<String> valueList = new ArrayList<>();
for (int j = 1; j < valueNode.size(); j+=2) {
valueList.add(valueNode.get(j).asText());
}
executableMap.put(key, valueList);
}
List<InstanceIdDto> instanceIdDtos1 = parseList(instanceIdFile.get(i), InstanceIdDto.class);
if (instanceIdDtos1 != null) {
instanceIdDtos.addAll(instanceIdDtos1);
}
}
condition.getExecutableGroup().forEach((key, executables) -> {
AtomicInteger index = new AtomicInteger(0);
Map<String, Integer> idCntMap = new HashMap<>();
executables.forEach(executable -> {
if (executableMap.containsKey(key)) {
if (executable instanceof Node) {
((Node) executable).setInstanceId((executableMap.get(key).get(index.getAndIncrement())));
if (executable instanceof Node) {
Node node = (Node) executable;
idCntMap.put(node.getId(), idCntMap.getOrDefault(node.getId(), -1) + 1);
for (InstanceIdDto dto : instanceIdDtos) {
if (Objects.equals(dto.getNodeId(), node.getId())
&& Objects.equals(dto.getChainId(), node.getCurrChainId())
&& Objects.equals(dto.getIndex(), idCntMap.get(node.getId()))) {
node.setInstanceId(dto.getInstanceId());
break;
}
}
}
});
});
@@ -105,25 +88,37 @@ public abstract class BaseNodeInstanceIdManageSpi implements NodeInstanceIdManag
}
// 写入时第一行为el的md5第二行为json格式的groupKey和对应的nodeId 和实例id
private List<String> writeNodeInstanceId(Condition condition, String elMd5) {
ArrayList<String> writeList = new ArrayList<>();
writeList.add(elMd5);
Map<String, List<String>> groupKeyAndInstanceIds = new HashMap<>();
// instanceId a_XXX_0
// {"chainId":"chain1","nodeId":"a","instanceId":"XXXX","index":0},
private List<InstanceIdDto> writeNodeInstanceId(Condition condition) {
ArrayList<InstanceIdDto> instanceIdDtos = new ArrayList<>();
condition.getExecutableGroup().forEach((key, executables) -> {
List<String> instanceIds = new ArrayList<>();
Map<String, Integer> idCntMap = new HashMap<>();
executables.forEach(executable -> {
if (executable instanceof Node) {
((Node) executable).setInstanceId(generateShortUUID());
instanceIds.add(executable.getId());
instanceIds.add(((Node) executable).getInstanceId());
Node node = (Node) executable;
InstanceIdDto instanceIdDto = new InstanceIdDto();
instanceIdDto.setChainId(node.getCurrChainId());
instanceIdDto.setNodeId(node.getId());
String shortUUID = generateShortUUID();
idCntMap.put(node.getId(), idCntMap.getOrDefault(node.getId(), -1) + 1);
String instanceId = node.getId() + "_" + shortUUID + "_" + idCntMap.get(node.getId());
node.setInstanceId(instanceId);
instanceIdDto.setInstanceId(instanceId);
instanceIdDto.setIndex(idCntMap.get(node.getId()));
instanceIdDtos.add(instanceIdDto);
}
});
groupKeyAndInstanceIds.put(key, instanceIds);
});
writeList.add(toJsonString(groupKeyAndInstanceIds));
return writeList;
return instanceIdDtos;
}
}

View File

@@ -3,6 +3,8 @@ package com.yomahub.liteflow.flow.instanceId;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.CharsetUtil;
import com.yomahub.liteflow.flow.entity.InstanceIdDto;
import com.yomahub.liteflow.util.JsonUtil;
import org.apache.commons.lang.StringUtils;
import java.io.File;
@@ -33,13 +35,16 @@ public class DefaultNodeInstanceIdManageSpiImpl extends BaseNodeInstanceIdManage
}
@Override
public void writeInstanceIdFile(List<String> instanceIdList, String chainId) {
public void writeInstanceIdFile(List<InstanceIdDto> instanceIdList, String elMd5, String chainId) {
if (StringUtils.isBlank(chainId) || CollUtil.isEmpty(instanceIdList)) {
return;
}
File nodeDir = new File(basePath + chainId);
List<String> writeContent = new ArrayList<>();
writeContent.add(elMd5);
writeContent.add(JsonUtil.toJsonString(instanceIdList));
FileUtil.writeLines(instanceIdList, nodeDir.getPath(), CharsetUtil.UTF_8);
FileUtil.writeLines(writeContent, nodeDir.getPath(), CharsetUtil.UTF_8);
}
}

View File

@@ -2,6 +2,7 @@ package com.yomahub.liteflow.flow.instanceId;
import com.yomahub.liteflow.flow.element.Chain;
import com.yomahub.liteflow.flow.element.Condition;
import com.yomahub.liteflow.flow.entity.InstanceIdDto;
import java.util.List;
@@ -17,7 +18,7 @@ public interface NodeInstanceIdManageSpi {
List<String> readInstanceIdFile(String chainId);
// 写入文件保存
void writeInstanceIdFile(List<String> instanceIdList, String chainId);
void writeInstanceIdFile(List<InstanceIdDto> instanceIdList, String elMd5, String chainId);
// 根据实例id获取 节点实例定位
String getNodeInstanceLocationById(String chainId, String instanceId);

View File

@@ -18,13 +18,13 @@ public class SqlReadConstant {
public static final String INSTANT_CREATE_TABLE_SQL = "create table IF NOT EXISTS `node_instance_id_table`\n" +
"(\n" +
" `id` bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY,\n" +
" `application_name` varchar(32) NOT NULL,\n" +
" `chain_id` varchar(32) NOT NULL,\n" +
" `el_data_md5` varchar(128) NOT NULL,\n" +
" `group_key_instance_id` varchar(1024) NOT NULL,\n" +
" PRIMARY KEY (`id`)\n" +
" `node_instance_id_map_json` varchar(1024) NOT NULL\n" +
");";
// a_XXX_0
// {"chainId":"chain1","nodeId":"a","instanceId":"XXXX","index":0},
public static final String SQL_PATTERN = "SELECT * FROM {} WHERE {}='{}'";

View File

@@ -2,6 +2,7 @@ package com.yomahub.liteflow.parser.spi.instanceId;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import com.yomahub.liteflow.flow.entity.InstanceIdDto;
import com.yomahub.liteflow.flow.instanceId.BaseNodeInstanceIdManageSpi;
import com.yomahub.liteflow.parser.constant.ReadType;
import com.yomahub.liteflow.parser.constant.SqlReadConstant;
@@ -10,6 +11,7 @@ import com.yomahub.liteflow.parser.sql.read.SqlReadFactory;
import com.yomahub.liteflow.parser.sql.read.vo.InstanceIdVO;
import com.yomahub.liteflow.parser.sql.util.JDBCHelper;
import com.yomahub.liteflow.parser.sql.vo.SQLParserVO;
import com.yomahub.liteflow.util.JsonUtil;
import java.util.Arrays;
import java.util.Collections;
@@ -30,22 +32,22 @@ public class SqlNodeInstanceIdManageSpiImpl extends BaseNodeInstanceIdManageSpi
List<InstanceIdVO> readResult = insIdRead.read(chainId);
if (CollectionUtil.isNotEmpty(readResult)) {
return Arrays.asList(readResult.get(0).getElDataMd5(), readResult.get(0).getGroupKeyInstanceId());
return Arrays.asList(readResult.get(0).getElDataMd5(), readResult.get(0).getNodeInstanceIdMapJson());
}
return Collections.emptyList();
}
@Override
public void writeInstanceIdFile(List<String> instanceIdList, String chainId) {
public void writeInstanceIdFile(List<InstanceIdDto> instanceIdList, String elMd5, String chainId) {
JDBCHelper jdbcHelper = JDBCHelper.getInstance();
SQLParserVO conf = jdbcHelper.getSqlParserVO();
String insertSql = StrUtil.format(SqlReadConstant.INSTANT_INSERT_SQL, conf.getInstanceIdTableName(), conf.getInstanceIdApplicationNameField(),
conf.getGroupKeyInstanceIdField(), conf.getElDataMd5Field(), conf.getInstanceChainIdField(), conf.getApplicationName(), instanceIdList.get(1),
instanceIdList.get(0), chainId);
String updateSql = StrUtil.format(SqlReadConstant.INSTANT_UPDATE_SQL, conf.getInstanceIdTableName(), conf.getElDataMd5Field(), instanceIdList.get(0),
conf.getGroupKeyInstanceIdField(), instanceIdList.get(1), conf.getChainNameField(), chainId, conf.getInstanceIdApplicationNameField(), conf.getApplicationName());
conf.getNodeInstanceIdMapJsondField(), conf.getElDataMd5Field(), conf.getInstanceChainIdField(), conf.getApplicationName(), JsonUtil.toJsonString(instanceIdList),
elMd5, chainId);
String updateSql = StrUtil.format(SqlReadConstant.INSTANT_UPDATE_SQL, conf.getInstanceIdTableName(), conf.getElDataMd5Field(), elMd5,
conf.getNodeInstanceIdMapJsondField(), JsonUtil.toJsonString(instanceIdList), conf.getChainNameField(), chainId, conf.getInstanceIdApplicationNameField(), conf.getApplicationName());
String selectSql = StrUtil.format(SqlReadConstant.INSTANT_SELECT_SQL, conf.getInstanceIdTableName(), conf.getInstanceChainIdField(), chainId,
conf.getInstanceIdApplicationNameField(), conf.getApplicationName());

View File

@@ -28,7 +28,7 @@ public class InstanceIdRead extends AbstractSqlRead<InstanceIdVO> {
InstanceIdVO idVO = new InstanceIdVO();
idVO.setChainId(getStringFromRsWithCheck(rs, super.config.getInstanceChainIdField()));
idVO.setElDataMd5(getStringFromRsWithCheck(rs, super.config.getElDataMd5Field()));
idVO.setGroupKeyInstanceId(getStringFromRsWithCheck(rs, super.config.getGroupKeyInstanceIdField()));
idVO.setNodeInstanceIdMapJson(getStringFromRsWithCheck(rs, super.config.getNodeInstanceIdMapJsondField()));
return idVO;
}

View File

@@ -11,7 +11,7 @@ public class InstanceIdVO {
private String elDataMd5;
private String groupKeyInstanceId;
private String nodeInstanceIdMapJson;
public String getChainId() {
return chainId;
@@ -29,11 +29,11 @@ public class InstanceIdVO {
this.elDataMd5 = elDataMd5;
}
public String getGroupKeyInstanceId() {
return groupKeyInstanceId;
public String getNodeInstanceIdMapJson() {
return nodeInstanceIdMapJson;
}
public void setGroupKeyInstanceId(String groupKeyInstanceId) {
this.groupKeyInstanceId = groupKeyInstanceId;
public void setNodeInstanceIdMapJson(String nodeInstanceIdMapJson) {
this.nodeInstanceIdMapJson = nodeInstanceIdMapJson;
}
}

View File

@@ -79,7 +79,7 @@ public class SQLParserVO {
/**
* group_key_instance_id
*/
private String groupKeyInstanceIdField = "group_key_instance_id";
private String nodeInstanceIdMapJsondField = "node_instance_id_map_json";
/**
* 决策路由字段
@@ -443,12 +443,12 @@ public class SQLParserVO {
this.elDataMd5Field = elDataMd5Field;
}
public String getGroupKeyInstanceIdField() {
return groupKeyInstanceIdField;
public String getNodeInstanceIdMapJsondField() {
return nodeInstanceIdMapJsondField;
}
public void setGroupKeyInstanceIdField(String groupKeyInstanceIdField) {
this.groupKeyInstanceIdField = groupKeyInstanceIdField;
public void setNodeInstanceIdMapJsondField(String nodeInstanceIdMapJsondField) {
this.nodeInstanceIdMapJsondField = nodeInstanceIdMapJsondField;
}
public DataSourceConfig getBaomidou() {

View File

@@ -2,6 +2,7 @@ package com.yomahub.liteflow.test.sqlInstanceId;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import com.yomahub.liteflow.flow.entity.InstanceIdDto;
import com.yomahub.liteflow.flow.instanceId.NodeInstanceIdManageSpi;
import com.yomahub.liteflow.flow.instanceId.NodeInstanceIdManageSpiHolder;
import com.yomahub.liteflow.parser.sql.exception.ELSQLException;
@@ -51,18 +52,17 @@ public class SQLWithXmlELInstanceIdSpringbootTest extends BaseTest {
// 查询数据库实例id
String instanceId = queryInstanceId("r_chain4");
// 解析 JSON
JSONObject jsonObject = new JSONObject(instanceId);
JSONArray jsonArray = jsonObject.getJSONArray("DEFAULT_KEY");
List<InstanceIdDto> instanceIdDtos = JsonUtil.parseList(instanceId, InstanceIdDto.class);
// 构造实例id字符串
StringBuilder result = new StringBuilder();
for (int i = 0; i < jsonArray.length(); i += 2) {
String key = jsonArray.getString(i);
String value = jsonArray.getString(i + 1);
result.append(key).append("[").append(value).append("]");
if (i + 2 < jsonArray.length()) {
int i = 0;
for (InstanceIdDto dto : instanceIdDtos) {
result.append(dto.getNodeId()).append("[").append(dto.getInstanceId()).append("]");
if (i + 1 < instanceIdDtos.size()) {
result.append("==>");
}
i++;
}
LiteflowResponse response = flowExecutor.execute2Resp("r_chain4", "arg");
@@ -117,7 +117,7 @@ public class SQLWithXmlELInstanceIdSpringbootTest extends BaseTest {
String res = "";
while (rs.next()) {
res = rs.getString("GROUP_KEY_INSTANCE_ID");
res = rs.getString("node_instance_id_map_json");
}
return res;
} catch (SQLException e) {