feat(X-Pack): [数据填报]支持跟随数据源管理配置的字符集编码

This commit is contained in:
ulleo
2025-04-11 16:54:01 +08:00
committed by Junjun
parent a51f73e645
commit 7a619de59c
2 changed files with 64 additions and 12 deletions

View File

@@ -457,21 +457,30 @@ public class CalciteProvider extends Provider {
// schema
ResultSet resultSet = null;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
}
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
if (CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
((PreparedStatement) statement).setObject(i + 1, datasourceRequest.getTableFieldWithValues().get(i).getValue(), datasourceRequest.getTableFieldWithValues().get(i).getType());
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
LogUtil.info("execWithPreparedStatement param[" + (i + 1) + "](" + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName() + "): " + datasourceRequest.getTableFieldWithValues().get(i).getValue());
} catch (SQLException e) {
throw new SQLException(e.getMessage() + ". VALUE: " + datasourceRequest.getTableFieldWithValues().get(i).getValue().toString() + " , TARGET TYPE: " + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName());
@@ -509,6 +518,13 @@ public class CalciteProvider extends Provider {
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
@@ -519,7 +535,13 @@ public class CalciteProvider extends Provider {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
((PreparedStatement) statement).setObject(i + 1, datasourceRequest.getTableFieldWithValues().get(i).getValue(), datasourceRequest.getTableFieldWithValues().get(i).getType());
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
LogUtil.info("execWithPreparedStatement param[" + (i + 1) + "](" + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName() + "): " + datasourceRequest.getTableFieldWithValues().get(i).getValue());
} catch (SQLException e) {
throw new SQLException(e.getMessage() + ". VALUE: " + datasourceRequest.getTableFieldWithValues().get(i).getValue().toString() + " , TARGET TYPE: " + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName());
@@ -546,13 +568,20 @@ public class CalciteProvider extends Provider {
}
@Override
public ExecuteResult executeUpdate(DatasourceRequest datasourceRequest, boolean autoIncrement) throws DEException {
public ExecuteResult executeUpdate(DatasourceRequest datasourceRequest, String autoIncrementPkName) throws DEException {
DatasourceSchemaDTO value = datasourceRequest.getDsList().entrySet().iterator().next().getValue();
datasourceRequest.setDatasource(value);
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues(), autoIncrementPkName, datasourceConfiguration)) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
@@ -564,7 +593,13 @@ public class CalciteProvider extends Provider {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
((PreparedStatement) statement).setObject(i + 1, datasourceRequest.getTableFieldWithValues().get(i).getValue(), datasourceRequest.getTableFieldWithValues().get(i).getType());
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
LogUtil.info("execWithPreparedStatement param[" + (i + 1) + "](" + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName() + "): " + datasourceRequest.getTableFieldWithValues().get(i).getValue());
} catch (SQLException e) {
throw new SQLException(e.getMessage() + ". VALUE: " + datasourceRequest.getTableFieldWithValues().get(i).getValue().toString() + " , TARGET TYPE: " + datasourceRequest.getTableFieldWithValues().get(i).getColumnTypeName());
@@ -578,7 +613,7 @@ public class CalciteProvider extends Provider {
ExecuteResult result = new ExecuteResult();
result.setCount(count);
if (autoIncrement) {
if (StringUtils.isNotBlank(autoIncrementPkName)) {
List<String> generatedKeys = new ArrayList<>();
ResultSet keys = statement.getGeneratedKeys();
while (keys.next()) {
@@ -1400,13 +1435,30 @@ public class CalciteProvider extends Provider {
}
public Statement getPreparedStatement(Connection connection, int queryTimeout, String sql, List<TableFieldWithValue> values) throws Exception {
return getPreparedStatement(connection, queryTimeout, sql, values, null, null);
}
public Statement getPreparedStatement(Connection connection, int queryTimeout, String sql, List<TableFieldWithValue> values, String autoIncrementPkName, DatasourceConfiguration datasourceConfiguration) throws Exception {
if (connection == null) {
throw new Exception("Failed to get connection!");
}
if (CollectionUtils.isNotEmpty(values)) {
PreparedStatement stat = null;
String pkName = autoIncrementPkName;
try {
stat = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
pkName = new String(pkName.getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (StringUtils.isNotBlank(autoIncrementPkName)) {
String[] generatedColumns = {pkName};
stat = connection.prepareStatement(sql, generatedColumns);
} else {
stat = connection.prepareStatement(sql);
}
stat.setQueryTimeout(queryTimeout);
} catch (Exception e) {
DEException.throwException(e.getMessage());

View File

@@ -97,7 +97,7 @@ public abstract class Provider {
}
public ExecuteResult executeUpdate(DatasourceRequest datasourceRequest, boolean autoIncrement) {
public ExecuteResult executeUpdate(DatasourceRequest datasourceRequest, String autoIncrementPkName) {
return new ExecuteResult();
}