feat(X-Pack): [数据填报]支持Oracle插件

This commit is contained in:
ulleo
2025-04-15 17:17:00 +08:00
committed by Junjun
parent e0acb79c74
commit 4b47344e26

View File

@@ -486,32 +486,29 @@ public class CalciteProvider extends Provider {
// schema
ResultSet resultSet = null;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId())) {
Statement statement = getStatement(value, con, datasourceRequest, datasourceConfiguration, null);
if (CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (valueObject instanceof String
&& DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
//转换为数据库的字符集
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
if (valueObject instanceof String && datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB) && DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
if (datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB)) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
@@ -552,32 +549,28 @@ public class CalciteProvider extends Provider {
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues())) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId())) {
Statement statement = getStatement(value, con, datasourceRequest, datasourceConfiguration, null);
if (CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
LogUtil.info("execWithPreparedStatement sql: " + datasourceRequest.getQuery());
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (valueObject instanceof String
&& DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
//转换为数据库的字符集
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
if (valueObject instanceof String && datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB) && DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
if (datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB)) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
@@ -606,6 +599,24 @@ public class CalciteProvider extends Provider {
}
}
/**
* 针对Oracle特殊处理
*/
private Statement getStatement(DatasourceSchemaDTO value, Connection con, DatasourceRequest datasourceRequest, DatasourceConfiguration datasourceConfiguration, String autoIncrementPkName) throws Exception {
Statement statement;
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
statement = getStatement(con, datasourceConfiguration.getQueryTimeout());
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
//调整字符集
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
}
statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues(), autoIncrementPkName, datasourceConfiguration);
return statement;
}
@Override
public ExecuteResult executeUpdate(DatasourceRequest datasourceRequest, String autoIncrementPkName) throws DEException {
DatasourceSchemaDTO value = datasourceRequest.getDsList().entrySet().iterator().next().getValue();
@@ -613,19 +624,9 @@ public class CalciteProvider extends Provider {
DatasourceConfiguration datasourceConfiguration = JsonUtil.parseObject(datasourceRequest.getDatasource().getConfiguration(), DatasourceConfiguration.class);
// schema
ResultSet resultSet = null;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
datasourceRequest.setQuery(new String(datasourceRequest.getQuery().getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset()));
}
} catch (Exception e) {
e.printStackTrace();
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId()); Statement statement = getPreparedStatement(con, datasourceConfiguration.getQueryTimeout(), datasourceRequest.getQuery(), datasourceRequest.getTableFieldWithValues(), autoIncrementPkName, datasourceConfiguration)) {
if (DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (!CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
statement.executeUpdate("ALTER SESSION SET CURRENT_SCHEMA = " + datasourceConfiguration.getSchema());
}
}
try (Connection con = getConnectionFromPool(datasourceRequest.getDatasource().getId())) {
Statement statement = getStatement(value, con, datasourceRequest, datasourceConfiguration, autoIncrementPkName);
int count = 0;
if (CollectionUtils.isNotEmpty(datasourceRequest.getTableFieldWithValues())) {
@@ -633,14 +634,19 @@ public class CalciteProvider extends Provider {
for (int i = 0; i < datasourceRequest.getTableFieldWithValues().size(); i++) {
try {
Object valueObject = datasourceRequest.getTableFieldWithValues().get(i).getValue();
if (valueObject instanceof String) {
if (valueObject instanceof String
&& DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
//转换为数据库的字符集
valueObject = new String(((String) valueObject).getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
}
if (valueObject instanceof String && datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB) && DatasourceConfiguration.DatasourceType.valueOf(value.getType()) == DatasourceConfiguration.DatasourceType.oracle) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
if (datasourceRequest.getTableFieldWithValues().get(i).getType().equals(Types.CLOB)) {
Reader reader = new StringReader((String) valueObject);
((PreparedStatement) statement).setCharacterStream(i + 1, reader, ((String) valueObject).length());
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
} else {
((PreparedStatement) statement).setObject(i + 1, valueObject, datasourceRequest.getTableFieldWithValues().get(i).getType());
}
@@ -748,11 +754,10 @@ public class CalciteProvider extends Provider {
row[j] = rs.getBlob(j + 1) == null ? "" : rs.getBlob(j + 1).toString();
}
if (targetCharset != null && StringUtils.isNotEmpty(rs.getString(j + 1)) && columnType == Types.CLOB) {
Clob c = rs.getClob(j + 1);
if (originCharset == null) {
row[j] = new String((c.getSubString(1, (int) c.length())).getBytes(), targetCharset);
row[j] = new String(rs.getString(j + 1).getBytes(), targetCharset);
} else {
row[j] = new String((c.getSubString(1, (int) c.length())).getBytes(originCharset), targetCharset);
row[j] = new String(rs.getString(j + 1).getBytes(originCharset), targetCharset);
}
} else if (targetCharset != null && StringUtils.isNotEmpty(rs.getString(j + 1)) && (columnType != Types.NVARCHAR && columnType != Types.NCHAR)) {
row[j] = new String(rs.getBytes(j + 1), targetCharset);
@@ -1505,13 +1510,6 @@ public class CalciteProvider extends Provider {
if (CollectionUtils.isNotEmpty(values)) {
PreparedStatement stat = null;
String pkName = autoIncrementPkName;
try {
if (StringUtils.isNotEmpty(datasourceConfiguration.getCharset()) && StringUtils.isNotEmpty(datasourceConfiguration.getTargetCharset())) {
pkName = new String(pkName.getBytes(datasourceConfiguration.getTargetCharset()), datasourceConfiguration.getCharset());
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (StringUtils.isNotBlank(autoIncrementPkName)) {
String[] generatedColumns = {pkName};