Merge remote-tracking branch 'origin/main' into main

# Conflicts:
#	frontend/src/views/panel/list/PanelViewShow.vue
This commit is contained in:
wangjiahao
2021-04-19 11:51:43 +08:00
46 changed files with 1400 additions and 545 deletions

View File

@@ -2,8 +2,6 @@ package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import io.dataease.commons.utils.CommonThreadPool;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.repository.filerep.KettleFileRepository;
@@ -40,7 +38,22 @@ public class CommonConfig {
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR")
.config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
.config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
.config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
.config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
.config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
.config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
.config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
.config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
.config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
.config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
.config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
.config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
.config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
.config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
.config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
.config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
.getOrCreate();
return spark;
}

View File

@@ -1,6 +1,8 @@
package io.dataease.controller.chart;
import io.dataease.base.domain.ChartViewWithBLOBs;
import io.dataease.controller.request.chart.ChartExtFilterRequest;
import io.dataease.controller.request.chart.ChartExtRequest;
import io.dataease.controller.request.chart.ChartViewRequest;
import io.dataease.dto.chart.ChartViewDTO;
import io.dataease.service.chart.ChartViewService;
@@ -41,7 +43,12 @@ public class ChartViewController {
}
@PostMapping("/getData/{id}")
public ChartViewDTO getData(@PathVariable String id) throws Exception {
return chartViewService.getData(id);
public ChartViewDTO getData(@PathVariable String id, @RequestBody ChartExtRequest requestList) throws Exception {
return chartViewService.getData(id, requestList);
}
@PostMapping("chartDetail/{id}")
public Map<String, Object> chartDetail(@PathVariable String id) {
return chartViewService.getChartDetail(id);
}
}

View File

@@ -82,4 +82,8 @@ public class DataSetTableController {
dataSetTableService.saveIncrementalConfig(datasetTableIncrementalConfig);
}
@PostMapping("datasetDetail/{id}")
public Map<String, Object> datasetDetail(@PathVariable String id) {
return dataSetTableService.getDatasetDetail(id);
}
}

View File

@@ -0,0 +1,22 @@
package io.dataease.controller.request.chart;
import io.dataease.base.domain.DatasetTableField;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @Author gin
* @Date 2021/4/19 10:24 上午
*/
@Getter
@Setter
public class ChartExtFilterRequest {
private String componentId;
private String fieldId;
private String operator;
private List<String> value;
private List<String> viewIds;
private DatasetTableField datasetTableField;
}

View File

@@ -0,0 +1,16 @@
package io.dataease.controller.request.chart;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* @Author gin
* @Date 2021/4/19 11:29 上午
*/
@Getter
@Setter
public class ChartExtRequest {
private List<ChartExtFilterRequest> filter;
}

View File

@@ -19,7 +19,7 @@ public abstract class DatasourceProvider {
abstract public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception;
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception{
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
return new ArrayList<>();
};
@@ -27,7 +27,7 @@ public abstract class DatasourceProvider {
getData(datasourceRequest);
}
abstract public Long count(DatasourceRequest datasourceRequest)throws Exception;
abstract public Long count(DatasourceRequest datasourceRequest) throws Exception;
abstract public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception;
@@ -35,4 +35,5 @@ public abstract class DatasourceProvider {
abstract public List<TableFiled> fetchResultField(ResultSet rs) throws Exception;
abstract public void initConnectionPool(DatasourceRequest datasourceRequest) throws Exception;
}

View File

@@ -13,24 +13,28 @@ import org.springframework.stereotype.Service;
import java.sql.*;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
@Service("jdbc")
public class JdbcProvider extends DatasourceProvider {
private static Map<String, ArrayBlockingQueue<Connection>> jdbcConnection = new HashMap<>();
private static int poolSize = 20;
@Override
public List<String[]> getData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery())
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery());
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@@ -38,14 +42,18 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs;
Connection connection = null;
try {
Connection connection = getConnection(datasourceRequest);
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery());
returnSource(connection, datasourceRequest.getDatasource().getId());
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return rs;
}
@@ -53,16 +61,19 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public List<String[]> getPageData(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()))
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()));
returnSource(connection, datasourceRequest.getDatasource().getId());
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@@ -112,23 +123,28 @@ public class JdbcProvider extends DatasourceProvider {
public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception {
List<String> tables = new ArrayList<>();
String queryStr = getTablesSql(datasourceRequest);
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnectionFromPool(datasourceRequest);
Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr);
while (resultSet.next()) {
tables.add(resultSet.getString(1));
}
return tables;
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
returnSource(con, datasourceRequest.getDatasource().getId());
}
return tables;
}
@Override
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
List<TableFiled> list = new LinkedList<>();
try (
Connection connection = getConnection(datasourceRequest);
) {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
DatabaseMetaData databaseMetaData = connection.getMetaData();
ResultSet resultSet = databaseMetaData.getColumns(null, "%", datasourceRequest.getTable().toUpperCase(), "%");
while (resultSet.next()) {
@@ -152,6 +168,8 @@ public class JdbcProvider extends DatasourceProvider {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
return list;
}
@@ -161,27 +179,73 @@ public class JdbcProvider extends DatasourceProvider {
@Override
public void test(DatasourceRequest datasourceRequest) throws Exception {
String queryStr = getTablesSql(datasourceRequest);
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnection(datasourceRequest);
Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(queryStr);
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
con.close();
}
}
public Long count(DatasourceRequest datasourceRequest) throws Exception {
try (Connection con = getConnection(datasourceRequest); Statement ps = con.createStatement()) {
Connection con = null;
try {
con = getConnectionFromPool(datasourceRequest); Statement ps = con.createStatement();
ResultSet resultSet = ps.executeQuery(datasourceRequest.getQuery());
while (resultSet.next()) {
return resultSet.getLong(1);
}
} catch (Exception e) {
throw new Exception("ERROR: " + e.getMessage(), e);
}finally {
returnSource(con, datasourceRequest.getDatasource().getId());
}
return 0L;
}
private Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
private void returnSource(Connection connection, String dataSourceId) throws Exception{
if(connection != null && !connection.isClosed()){
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(dataSourceId);
connections.put(connection);
}
}
private Connection getConnectionFromPool(DatasourceRequest datasourceRequest)throws Exception {
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) {
initConnectionPool(datasourceRequest);
}
connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
Connection co = connections.take();
return co;
}
@Override
public void initConnectionPool(DatasourceRequest datasourceRequest)throws Exception{
ArrayBlockingQueue<Connection> connections = jdbcConnection.get(datasourceRequest.getDatasource().getId());
if (connections == null) {
connections = new ArrayBlockingQueue<>(poolSize);
for (int i = 0; i < poolSize ; i++) {
Connection connection = getConnection(datasourceRequest);
connections.add(connection);
}
jdbcConnection.put(datasourceRequest.getDatasource().getId(), connections);
}else {
for (int i = 0; i < poolSize ; i++) {
Connection connection = connections.take();
connection.close();
connection = getConnection(datasourceRequest);
connections.add(connection);
}
}
}
private static Connection getConnection(DatasourceRequest datasourceRequest) throws Exception {
String username = null;
String password = null;
String driver = null;

View File

@@ -5,6 +5,7 @@ import io.dataease.base.mapper.*;
import io.dataease.base.mapper.ext.ExtDataSourceMapper;
import io.dataease.base.mapper.ext.query.GridExample;
import io.dataease.commons.exception.DEException;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.controller.sys.base.BaseGridRequest;
import io.dataease.datasource.provider.DatasourceProvider;
import io.dataease.datasource.provider.ProviderFactory;
@@ -24,7 +25,8 @@ public class DatasourceService {
@Resource
private DatasourceMapper datasourceMapper;
@Resource
private CommonThreadPool commonThreadPool;
@Resource
private ExtDataSourceMapper extDataSourceMapper;
@@ -39,6 +41,7 @@ public class DatasourceService {
datasource.setUpdateTime(currentTimeMillis);
datasource.setCreateTime(currentTimeMillis);
datasourceMapper.insertSelective(datasource);
initConnectionPool(datasource);
return datasource;
}
@@ -68,6 +71,7 @@ public class DatasourceService {
datasource.setCreateTime(null);
datasource.setUpdateTime(System.currentTimeMillis());
datasourceMapper.updateByPrimaryKeySelective(datasource);
initConnectionPool(datasource);
}
public void validate(Datasource datasource) throws Exception {
@@ -89,4 +93,30 @@ public class DatasourceService {
return datasourceMapper.selectByPrimaryKey(id);
}
private void initConnectionPool(Datasource datasource){
commonThreadPool.addTask(() ->{
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){}
});
}
public void initAllDataSourceConnectionPool(){
List<Datasource> datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample());
datasources.forEach(datasource -> {
commonThreadPool.addTask(() ->{
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.initConnectionPool(datasourceRequest);
}catch (Exception e){
e.printStackTrace();
}
});
});
}
}

View File

@@ -0,0 +1,31 @@
package io.dataease.listener;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.datasource.service.DatasourceService;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.spark.SparkCalc;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
@Order(value = 2)
public class AppStartInitDataSourceListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private DatasourceService datasourceService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= Init datasource connection pool =================");
// 项目启动从数据集中找到定时抽取的表从HBase中读取放入缓存
datasourceService.initAllDataSourceConnectionPool();
}
}

View File

@@ -1,7 +1,6 @@
package io.dataease.listener;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.job.sechedule.ScheduleManager;
import io.dataease.service.ScheduleService;
import io.dataease.service.dataset.DataSetTableTaskService;
import org.springframework.boot.context.event.ApplicationReadyEvent;

View File

@@ -3,16 +3,10 @@ package io.dataease.listener;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.DatasetTableFieldExample;
import io.dataease.base.mapper.DatasetTableFieldMapper;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.spark.SparkCalc;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
@@ -45,14 +39,14 @@ public class AppStartReadHBaseListener implements ApplicationListener<Applicatio
datasetTableExample.createCriteria().andModeEqualTo(1);
List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
for (DatasetTable table : datasetTables) {
commonThreadPool.addTask(() -> {
// commonThreadPool.addTask(() -> {
try {
List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
sparkCalc.getHBaseDataAndCache(table.getId(), fields);
} catch (Exception e) {
e.printStackTrace();
}
});
// });
}
}
}

View File

@@ -4,9 +4,10 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.base.mapper.DatasetTableFieldMapper;
import io.dataease.commons.utils.AuthUtils;
import io.dataease.commons.utils.BeanUtils;
import io.dataease.controller.request.chart.ChartExtFilterRequest;
import io.dataease.controller.request.chart.ChartExtRequest;
import io.dataease.controller.request.chart.ChartViewRequest;
import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.provider.DatasourceProvider;
@@ -21,6 +22,7 @@ import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -88,7 +90,7 @@ public class ChartViewService {
chartViewMapper.deleteByExample(chartViewExample);
}
public ChartViewDTO getData(String id) throws Exception {
public ChartViewDTO getData(String id, ChartExtRequest requestList) throws Exception {
ChartViewWithBLOBs view = chartViewMapper.selectByPrimaryKey(id);
List<ChartViewFieldDTO> xAxis = new Gson().fromJson(view.getXAxis(), new TypeToken<List<ChartViewFieldDTO>>() {
}.getType());
@@ -107,6 +109,24 @@ public class ChartViewService {
// List<DatasetTableField> xList = dataSetTableFieldsService.getListByIds(xIds);
// List<DatasetTableField> yList = dataSetTableFieldsService.getListByIds(yIds);
// 过滤来自仪表盘的条件
List<ChartExtFilterRequest> extFilterList = new ArrayList<>();
if (ObjectUtils.isNotEmpty(requestList.getFilter())) {
for (ChartExtFilterRequest request : requestList.getFilter()) {
DatasetTableField datasetTableField = dataSetTableFieldsService.get(request.getFieldId());
request.setDatasetTableField(datasetTableField);
if (StringUtils.equalsIgnoreCase(datasetTableField.getTableId(), view.getTableId())) {
if (CollectionUtils.isNotEmpty(request.getViewIds())) {
if (request.getViewIds().contains(view.getId())) {
extFilterList.add(request);
}
} else {
extFilterList.add(request);
}
}
}
}
// 获取数据集
DatasetTable table = dataSetTableService.get(view.getTableId());
// 判断连接方式,直连或者定时抽取 table.mode
@@ -119,15 +139,15 @@ public class ChartViewService {
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
if (StringUtils.equalsIgnoreCase(table.getType(), "db")) {
datasourceRequest.setTable(dataTableInfoDTO.getTable());
datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis));
datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis, extFilterList));
} else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) {
datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis));
datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis, extFilterList));
}
data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取
// 获取数据集de字段
List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0]);
data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0], extFilterList);
}
// 图表组件可再扩展
@@ -163,18 +183,45 @@ public class ChartViewService {
return dto;
}
public String getSQL(String type, String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis) {
public String transMysqlExtFilter(List<ChartExtFilterRequest> requestList) {
if (CollectionUtils.isEmpty(requestList)) {
return "";
}
StringBuilder filter = new StringBuilder();
for (ChartExtFilterRequest request : requestList) {
List<String> value = request.getValue();
if (CollectionUtils.isEmpty(value)) {
continue;
}
DatasetTableField field = request.getDatasetTableField();
filter.append(" AND ")
.append(field.getOriginName())
.append(" ")
.append(transMysqlFilterTerm(request.getOperator()))
.append(" ");
if (StringUtils.containsIgnoreCase(request.getOperator(), "in")) {
filter.append("('").append(StringUtils.join(value, "','")).append("')");
} else if (StringUtils.containsIgnoreCase(request.getOperator(), "like")) {
filter.append("'%").append(value.get(0)).append("%'");
} else {
filter.append("'").append(value.get(0)).append("'");
}
}
return filter.toString();
}
public String getSQL(String type, String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, List<ChartExtFilterRequest> extFilterRequestList) {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(type);
switch (datasourceType) {
case mysql:
return transMysqlSQL(table, xAxis, yAxis);
return transMysqlSQL(table, xAxis, yAxis, extFilterRequestList);
case sqlServer:
default:
return "";
}
}
public String transMysqlSQL(String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis) {
public String transMysqlSQL(String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, List<ChartExtFilterRequest> extFilterRequestList) {
// 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
@@ -185,7 +232,7 @@ public class ChartViewService {
StringUtils.join(group, ","),
StringUtils.join(field, ","),
table,
"",
transMysqlExtFilter(extFilterRequestList),// origin field filter and panel field filter
StringUtils.join(group, ","),
StringUtils.join(order, ","));
if (sql.endsWith(",")) {
@@ -221,6 +268,14 @@ public class ChartViewService {
return " > ";
case "ge":
return " >= ";
case "in":
return " IN ";
case "not in":
return " NOT IN ";
case "like":
return " LIKE ";
case "not like":
return " NOT LIKE ";
case "null":
return " IS NULL ";
case "not_null":
@@ -250,4 +305,15 @@ public class ChartViewService {
throw new RuntimeException("Name can't repeat in same group.");
}
}
public Map<String, Object> getChartDetail(String id) {
Map<String, Object> map = new HashMap<>();
ChartViewWithBLOBs chartViewWithBLOBs = chartViewMapper.selectByPrimaryKey(id);
map.put("chart", chartViewWithBLOBs);
if (ObjectUtils.isNotEmpty(chartViewWithBLOBs)) {
Map<String, Object> datasetDetail = dataSetTableService.getDatasetDetail(chartViewWithBLOBs.getTableId());
map.putAll(datasetDetail);
}
return map;
}
}

View File

@@ -66,4 +66,8 @@ public class DataSetTableFieldsService {
datasetTableFieldExample.createCriteria().andTableIdEqualTo(id);
return datasetTableFieldMapper.selectByExample(datasetTableFieldExample);
}
public DatasetTableField get(String id) {
return datasetTableFieldMapper.selectByPrimaryKey(id);
}
}

View File

@@ -16,6 +16,7 @@ import io.dataease.datasource.provider.ProviderFactory;
import io.dataease.datasource.request.DatasourceRequest;
import io.dataease.dto.dataset.DataTableInfoDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -422,4 +423,15 @@ public class DataSetTableService {
throw new RuntimeException("Name can't repeat in same group.");
}
}
public Map<String, Object> getDatasetDetail(String id) {
Map<String, Object> map = new HashMap<>();
DatasetTable table = datasetTableMapper.selectByPrimaryKey(id);
map.put("table", table);
if (ObjectUtils.isNotEmpty(table)) {
Datasource datasource = datasourceMapper.selectByPrimaryKey(table.getDataSourceId());
map.put("datasource", datasource);
}
return map;
}
}

View File

@@ -472,6 +472,8 @@ public class ExtractDataService {
hBaseOutputMeta.setTargetMappingName("target_mapping");
hBaseOutputMeta.setNamedCluster(clusterTemplate);
hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
hBaseOutputMeta.setDisableWriteToWAL(true);
hBaseOutputMeta.setWriteBufferSize("31457280"); //30M
if (extractType.equalsIgnoreCase("incremental_delete")) {
hBaseOutputMeta.setDeleteRowKey(true);
}

View File

@@ -58,7 +58,7 @@ public class PanelGroupService {
return panelGroupDTOList;
}
public void getTreeChildren(List<PanelGroupDTO> parentPanelGroupDTO){
public void getTreeChildren(List<PanelGroupDTO> parentPanelGroupDTO) {
Optional.ofNullable(parentPanelGroupDTO).ifPresent(parent -> parent.forEach(panelGroupDTO -> {
List<PanelGroupDTO> panelGroupDTOChildren = extPanelGroupMapper.panelGroupList(new PanelGroupRequest(panelGroupDTO.getId()));
panelGroupDTO.setChildren(panelGroupDTOChildren);
@@ -66,7 +66,7 @@ public class PanelGroupService {
}));
}
public List<PanelGroupDTO> getDefaultTree(PanelGroupRequest panelGroupRequest){
public List<PanelGroupDTO> getDefaultTree(PanelGroupRequest panelGroupRequest) {
return extPanelGroupMapper.panelGroupList(panelGroupRequest);
}
@@ -86,32 +86,32 @@ public class PanelGroupService {
}
public void deleteCircle(String id){
public void deleteCircle(String id) {
Assert.notNull(id, "id cannot be null");
extPanelGroupMapper.deleteCircle(id);
}
public PanelGroupWithBLOBs findOne(String panelId){
return panelGroupMapper.selectByPrimaryKey(panelId);
public PanelGroupWithBLOBs findOne(String panelId) {
return panelGroupMapper.selectByPrimaryKey(panelId);
}
public PanelGroupDTO findOneBack(String panelId) throws Exception{
public PanelGroupDTO findOneBack(String panelId) throws Exception {
PanelGroupDTO panelGroupDTO = extPanelGroupMapper.panelGroup(panelId);
Assert.notNull(panelGroupDTO, "未查询到仪表盘信息");
PanelDesignExample panelDesignExample = new PanelDesignExample();
panelDesignExample.createCriteria().andPanelIdEqualTo(panelId);
List<PanelDesign> panelDesignList = panelDesignMapper.selectByExample(panelDesignExample);
if(CollectionUtils.isNotEmpty(panelDesignList)){
List<PanelDesign> panelDesignList = panelDesignMapper.selectByExample(panelDesignExample);
if (CollectionUtils.isNotEmpty(panelDesignList)) {
List<PanelDesignDTO> panelDesignDTOList = new ArrayList<>();
//TODO 加载所有视图和组件的数据
for(PanelDesign panelDesign:panelDesignList){
for (PanelDesign panelDesign : panelDesignList) {
//TODO 获取view 视图数据
ChartViewDTO chartViewDTO = chartViewService.getData(panelDesign.getComponentId());
ChartViewDTO chartViewDTO = chartViewService.getData(panelDesign.getComponentId(), null);
//TODO 获取systemComponent 系统组件数据(待开发)
PanelDesignDTO panelDesignDTO = new PanelDesignDTO(chartViewDTO);
BeanUtils.copyBean(panelDesignDTO,panelDesign);
BeanUtils.copyBean(panelDesignDTO, panelDesign);
panelDesignDTO.setKeepFlag(true);
panelDesignDTOList.add(panelDesignDTO);
}
@@ -123,14 +123,14 @@ public class PanelGroupService {
}
public List<ChartViewDTO> getUsableViews(String panelId) throws Exception{
public List<ChartViewDTO> getUsableViews(String panelId) throws Exception {
List<ChartViewDTO> chartViewDTOList = new ArrayList<>();
List<ChartView> allChartView = chartViewMapper.selectByExample(null);
Optional.ofNullable(allChartView).orElse(new ArrayList<>()).stream().forEach(chartView -> {
try {
chartViewDTOList.add(chartViewService.getData(chartView.getId()));
}catch (Exception e){
LOGGER.error("获取view详情出错"+chartView.getId(),e);
chartViewDTOList.add(chartViewService.getData(chartView.getId(), null));
} catch (Exception e) {
LOGGER.error("获取view详情出错" + chartView.getId(), e);
}
});
return chartViewDTOList;
@@ -150,24 +150,24 @@ public class PanelGroupService {
//TODO 更新panelDesign 信息
String panelId = request.getId();
Assert.notNull(panelId,"panelId should not be null");
Assert.notNull(panelId, "panelId should not be null");
//清理原有design
extPanelDesignMapper.deleteByPanelId(panelId);
//保存view 或者component design
Optional.ofNullable(request.getPanelDesigns()).orElse(new ArrayList<>()).stream().forEach(panelDesignDTO -> {
if(panelDesignDTO.isKeepFlag()) {
String componentId = "";
if(StringUtils.equals(PanelConstants.COMPONENT_TYPE_VIEW,panelDesignDTO.getComponentType())){
componentId = panelDesignDTO.getChartView().getId();
}else{
//预留 公共组件id获取
componentId = "";
}
panelDesignDTO.setPanelId(panelId);
panelDesignDTO.setComponentId(componentId);
panelDesignDTO.setUpdateTime(System.currentTimeMillis());
panelDesignMapper.insertSelective(panelDesignDTO);
}
if (panelDesignDTO.isKeepFlag()) {
String componentId = "";
if (StringUtils.equals(PanelConstants.COMPONENT_TYPE_VIEW, panelDesignDTO.getComponentType())) {
componentId = panelDesignDTO.getChartView().getId();
} else {
//预留 公共组件id获取
componentId = "";
}
panelDesignDTO.setPanelId(panelId);
panelDesignDTO.setComponentId(componentId);
panelDesignDTO.setUpdateTime(System.currentTimeMillis());
panelDesignMapper.insertSelective(panelDesignDTO);
}
});
}
}

View File

@@ -2,11 +2,11 @@ package io.dataease.service.spark;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.controller.request.chart.ChartExtFilterRequest;
import io.dataease.dto.chart.ChartViewFieldDTO;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -22,12 +22,12 @@ import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.StorageLevel;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import scala.Tuple2;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Base64;
@@ -44,13 +44,12 @@ public class SparkCalc {
@Resource
private Environment env; // 保存了配置文件的信息
public List<String[]> getData(String hTable, List<DatasetTableField> fields, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception {
public List<String[]> getData(String hTable, List<DatasetTableField> fields, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable, List<ChartExtFilterRequest> requestList) throws Exception {
// Spark Context
SparkSession spark = CommonBeanFactory.getBean(SparkSession.class);
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
@@ -61,7 +60,7 @@ public class SparkCalc {
}
dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable, requestList));
// transform
List<String[]> data = new ArrayList<>();
List<Row> list = sql.collectAsList();
@@ -81,7 +80,6 @@ public class SparkCalc {
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
@@ -90,12 +88,14 @@ public class SparkCalc {
public Dataset<Row> getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List<DatasetTableField> fields) throws Exception {
Scan scan = new Scan();
scan.addFamily(column_family.getBytes());
scan.addFamily(Bytes.toBytes(column_family));
for (DatasetTableField field : fields) {
scan.addColumn(Bytes.toBytes(column_family), Bytes.toBytes(field.getOriginName()));
}
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
// HBase config
// Configuration conf = CommonBeanFactory.getBean(Configuration.class);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
@@ -144,13 +144,13 @@ public class SparkCalc {
});
StructType structType = DataTypes.createStructType(structFields);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType).persist();
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType).persist(StorageLevel.MEMORY_AND_DISK_SER());
CacheUtil.getInstance().addCacheData(hTable, dataFrame);
dataFrame.count();
return dataFrame;
}
public String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table) {
public String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table, List<ChartExtFilterRequest> extFilterRequestList) {
// 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
@@ -161,7 +161,7 @@ public class SparkCalc {
StringUtils.join(group, ","),
StringUtils.join(field, ","),
table,
"",
transExtFilter(extFilterRequestList),// origin field filter and panel field filter,
StringUtils.join(group, ","),
StringUtils.join(order, ","));
if (sql.endsWith(",")) {
@@ -197,6 +197,14 @@ public class SparkCalc {
return " > ";
case "ge":
return " >= ";
case "in":
return " IN ";
case "not in":
return " NOT IN ";
case "like":
return " LIKE ";
case "not like":
return " NOT LIKE ";
case "null":
return " IS NULL ";
case "not_null":
@@ -205,4 +213,31 @@ public class SparkCalc {
return "";
}
}
public String transExtFilter(List<ChartExtFilterRequest> requestList) {
if (CollectionUtils.isEmpty(requestList)) {
return "";
}
StringBuilder filter = new StringBuilder();
for (ChartExtFilterRequest request : requestList) {
List<String> value = request.getValue();
if (CollectionUtils.isEmpty(value)) {
continue;
}
DatasetTableField field = request.getDatasetTableField();
filter.append(" AND ")
.append(field.getOriginName())
.append(" ")
.append(transFilterTerm(request.getOperator()))
.append(" ");
if (StringUtils.containsIgnoreCase(request.getOperator(), "in")) {
filter.append("('").append(StringUtils.join(value, "','")).append("')");
} else if (StringUtils.containsIgnoreCase(request.getOperator(), "like")) {
filter.append("'%").append(value.get(0)).append("%'");
} else {
filter.append("'").append(value.get(0)).append("'");
}
}
return filter.toString();
}
}