From c4d671fc31d811aaba449e0d57c643dd4bf83470 Mon Sep 17 00:00:00 2001 From: junjie Date: Thu, 1 Apr 2021 17:30:06 +0800 Subject: [PATCH] feat(backend):spark calc fix --- .../service/chart/ChartViewService.java | 3 +- .../io/dataease/service/spark/SparkCalc.java | 36 +++++++++++++------ 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java index e919224bd1..3bfb5bba5a 100644 --- a/backend/src/main/java/io/dataease/service/chart/ChartViewService.java +++ b/backend/src/main/java/io/dataease/service/chart/ChartViewService.java @@ -119,7 +119,8 @@ public class ChartViewService { data = datasourceProvider.getData(datasourceRequest); } else if (table.getMode() == 1) {// 抽取 DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); - data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change + String tableName = dataTableInfoDTO.getTable() + "-" + table.getDataSourceId();// todo hBase table name maybe change + data = sparkCalc.getData(tableName, xAxis, yAxis, view.getId().split("-")[0]); } // 图表组件可再扩展 diff --git a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java index dea6b5aa63..52668d7603 100644 --- a/backend/src/main/java/io/dataease/service/spark/SparkCalc.java +++ b/backend/src/main/java/io/dataease/service/spark/SparkCalc.java @@ -16,16 +16,15 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import scala.Tuple2; +import javax.annotation.Resource; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Base64; @@ -39,6 +38,8 @@ import java.util.List; @Service public class SparkCalc { private static String column_family = "dataease"; + @Resource + private Environment env; // 保存了配置文件的信息 public List getData(String hTable, List xAxis, List yAxis, String tmpTable) throws Exception { Scan scan = new Scan(); @@ -46,8 +47,21 @@ public class SparkCalc { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); - JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class); - Configuration conf = CommonBeanFactory.getBean(Configuration.class); + // Spark Context +// JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class); + SparkSession spark = SparkSession.builder() + .appName(env.getProperty("spark.appName", "DataeaseJob")) + .master(env.getProperty("spark.master", "local[*]")) + .config("spark.scheduler.mode", "FAIR") + .getOrCreate(); + JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); + + // HBase config +// Configuration conf = CommonBeanFactory.getBean(Configuration.class); + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum")); + conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort")); + conf.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1")); conf.set(TableInputFormat.INPUT_TABLE, hTable); conf.set(TableInputFormat.SCAN, scanToString); @@ -103,12 +117,15 @@ public class SparkCalc { }); StructType structType = DataTypes.createStructType(structFields); - SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); + // Spark SQL Context +// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); + SQLContext sqlContext = new SQLContext(sparkContext); + sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); + sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1")); + Dataset dataFrame = sqlContext.createDataFrame(rdd, structType); dataFrame.createOrReplaceTempView(tmpTable); - Dataset sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable)); - // transform List data = new ArrayList<>(); List list = sql.collectAsList(); @@ -119,7 +136,6 @@ public class SparkCalc { } data.add(r); } - return data; }