diff --git a/backend/pom.xml b/backend/pom.xml
index 0add4657ae..38822a30a2 100644
--- a/backend/pom.xml
+++ b/backend/pom.xml
@@ -371,6 +371,18 @@
org.apache.spark
spark-sql_2.12
${spark.version}
+
+
+ janino
+ org.codehaus.janino
+
+
+
+
+
+ org.codehaus.janino
+ janino
+ 3.0.8
diff --git a/backend/src/main/java/io/dataease/config/CommonConfig.java b/backend/src/main/java/io/dataease/config/CommonConfig.java
index 229ee6a069..09bc21dad6 100644
--- a/backend/src/main/java/io/dataease/config/CommonConfig.java
+++ b/backend/src/main/java/io/dataease/config/CommonConfig.java
@@ -3,12 +3,12 @@ package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
import javax.annotation.Resource;
@@ -23,7 +23,7 @@ public class CommonConfig {
@Bean
@ConditionalOnMissingBean
- public org.apache.hadoop.conf.Configuration configuration(){
+ public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
@@ -34,10 +34,19 @@ public class CommonConfig {
@Bean
@ConditionalOnMissingBean
- public JavaSparkContext javaSparkContext(){
- SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob") ).setMaster(env.getProperty("spark.master", "local[*]") );
+ public JavaSparkContext javaSparkContext() {
+ SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob")).setMaster(env.getProperty("spark.master", "local[*]"));
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return sc;
}
+
+ @Bean
+ @ConditionalOnMissingBean
+ public SQLContext sqlContext(JavaSparkContext javaSparkContext) {
+ SQLContext sqlContext = new SQLContext(javaSparkContext);
+ sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
+ sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
+ return sqlContext;
+ }
}
diff --git a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java
index 9eae0f4a58..e4a7127d77 100644
--- a/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java
+++ b/backend/src/main/java/io/dataease/dto/chart/ChartViewFieldDTO.java
@@ -2,6 +2,7 @@ package io.dataease.dto.chart;
import lombok.Data;
+import java.io.Serializable;
import java.util.List;
/**
@@ -9,7 +10,7 @@ import java.util.List;
* @Date 2021/3/11 1:18 下午
*/
@Data
-public class ChartViewFieldDTO {
+public class ChartViewFieldDTO implements Serializable {
private String id;
private String tableId;
diff --git a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java
index a283c131d1..202f1d9a17 100644
--- a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java
+++ b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java
@@ -2,7 +2,10 @@ package io.dataease.service.chart;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import io.dataease.base.domain.*;
+import io.dataease.base.domain.ChartViewExample;
+import io.dataease.base.domain.ChartViewWithBLOBs;
+import io.dataease.base.domain.DatasetTable;
+import io.dataease.base.domain.Datasource;
import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.commons.utils.BeanUtils;
import io.dataease.controller.request.chart.ChartViewRequest;
@@ -15,8 +18,8 @@ import io.dataease.dto.chart.ChartViewDTO;
import io.dataease.dto.chart.ChartViewFieldDTO;
import io.dataease.dto.chart.Series;
import io.dataease.dto.dataset.DataTableInfoDTO;
-import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.dataset.DataSetTableService;
+import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -25,7 +28,6 @@ import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
-import java.util.stream.Collectors;
/**
* @Author gin
@@ -40,7 +42,7 @@ public class ChartViewService {
@Resource
private DatasourceService datasourceService;
@Resource
- private DataSetTableFieldsService dataSetTableFieldsService;
+ private SparkCalc sparkCalc;
public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) {
long timestamp = System.currentTimeMillis();
@@ -102,22 +104,27 @@ public class ChartViewService {
// 获取数据集
DatasetTable table = dataSetTableService.get(view.getTableId());
- // todo 判断连接方式,直连或者定时抽取 table.mode
- Datasource ds = datasourceService.get(table.getDataSourceId());
- DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
- DatasourceRequest datasourceRequest = new DatasourceRequest();
- datasourceRequest.setDatasource(ds);
- DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
- if (StringUtils.equalsIgnoreCase(table.getType(), "db")) {
- datasourceRequest.setTable(dataTableInfoDTO.getTable());
- datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis));
- } else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) {
- datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis));
+ // 判断连接方式,直连或者定时抽取 table.mode
+ List data = new ArrayList<>();
+ if (table.getMode() == 0) {// 直连
+ Datasource ds = datasourceService.get(table.getDataSourceId());
+ DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
+ DatasourceRequest datasourceRequest = new DatasourceRequest();
+ datasourceRequest.setDatasource(ds);
+ DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
+ if (StringUtils.equalsIgnoreCase(table.getType(), "db")) {
+ datasourceRequest.setTable(dataTableInfoDTO.getTable());
+ datasourceRequest.setQuery(getSQL(ds.getType(), dataTableInfoDTO.getTable(), xAxis, yAxis));
+ } else if (StringUtils.equalsIgnoreCase(table.getType(), "sql")) {
+ datasourceRequest.setQuery(getSQL(ds.getType(), " (" + dataTableInfoDTO.getSql() + ") AS tmp ", xAxis, yAxis));
+ }
+ data = datasourceProvider.getData(datasourceRequest);
+ } else if (table.getMode() == 1) {// 抽取
+ DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
+ data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change
}
- List data = datasourceProvider.getData(datasourceRequest);
-
- // todo 处理结果,目前做一个单系列图表,后期图表组件再扩展
+ // 图表组件可再扩展
for (ChartViewFieldDTO y : yAxis) {
Series series1 = new Series();
series1.setName(y.getName());
@@ -163,7 +170,7 @@ public class ChartViewService {
}
public String transMysqlSQL(String table, List xAxis, List yAxis) {
- // TODO 字段汇总 排序等
+ // 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
String[] order = yAxis.stream().filter(y -> StringUtils.isNotEmpty(y.getSort()) && !StringUtils.equalsIgnoreCase(y.getSort(), "none"))
diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java
new file mode 100644
index 0000000000..1006d898bf
--- /dev/null
+++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java
@@ -0,0 +1,169 @@
+package io.dataease.service.spark;
+
+import io.dataease.commons.utils.CommonBeanFactory;
+import io.dataease.dto.chart.ChartViewFieldDTO;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.springframework.stereotype.Service;
+import scala.Tuple2;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static org.reflections8.Reflections.collect;
+
+/**
+ * @Author gin
+ * @Date 2021/3/26 3:49 下午
+ */
+@Service
+public class SparkCalc {
+ private static String column_family = "dataease";
+
+ public List getData(String hTable, List xAxis, List yAxis, String tmpTable) throws Exception {
+ Scan scan = new Scan();
+ scan.addFamily(column_family.getBytes());
+ ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
+ String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
+
+ JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
+ Configuration conf = CommonBeanFactory.getBean(Configuration.class);
+ conf.set(TableInputFormat.INPUT_TABLE, hTable);
+ conf.set(TableInputFormat.SCAN, scanToString);
+
+ JavaPairRDD pairRDD = sparkContext.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
+ JavaRDD rdd = pairRDD.map((Function, Row>) immutableBytesWritableResultTuple2 ->
+ {
+ Result result = immutableBytesWritableResultTuple2._2;
+ List