Spark SQL 学习笔记
Spark SQL中实现Hive MapJoin
重点参数1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
## 定义
- DataFrames
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.
- Datasets
A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).
## 概述
入口: SQLContext
``` scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
Hive支持: HiveContext
功能: more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables
Creating DataFrames
With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// Print the schema in a tree format
df.printSchema()
// Select only the "name" column
df.select("name").show()
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// Select people older than 21
df.filter(df("age") > 21).show()
// Count people by age
df.groupBy("age").count().show()
运行sql
1 | val df = sqlContext.sql("SELECT * FROM table") |
Creating Datasets
1 | // Encoders for most common types are automatically provided by importing sqlContext.implicits._ |
Interoperating with RDDs
Inferring the Schema Using Reflection
The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame.
1 | // sc is an existing SparkContext. |
Programmatically Specifying the Schema
- Create an RDD of Rows from the original RDD;
- Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
- Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
Data Sources
Generic Load/Save Functions
1 | val df = sqlContext.read.load("examples/src/main/resources/users.parquet") |
Run SQL on files directly
1 | val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") |
Saving to Persistent Tables
When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore.
将会真的使用hive创建一张内表
Parquet
Partition Discovery 分区
Currently, numeric data types and string type are supported.
参数设置: spark.sql.sources.partitionColumnTypeInference.enabled true.
文件路径 path/to/table/gender=male
Schema Merging
支持Parquet属性变更
该属性默认被关闭1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
hive支持
1 | // sc is an existing SparkContext. |
JDBC To Other Databases
支持JDBC协议连接其他数据源1
2
3val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
性能优化
Caching Data In Memory
- sqlContext.cacheTable(“tableName”) / sqlContext.uncacheTable(“tableName”)
- dataFrame.cache()
参数 | 默认值 | 说明 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 是否进行 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | cache时的batch的大小 |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | join时进行广播的数据量 |
spark.sql.tungsten.enabled | true | 是否开启tungsten支持 |
spark.sql.shuffle.partitions | 200 | join和聚合时shuffle的分区数量 |
Distributed SQL Engine
启动ThriftServer 使用beeline或者JDBC连接使用
- Running the Thrift JDBC/ODBC server
The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 1.2.1 You can test the JDBC server with the beeline script that comes with either Spark or Hive 1.2.1.
To start the JDBC/ODBC server, run the following in the Spark directory:./sbin/start-thriftserver.sh
or
./sbin/start-thriftserver.sh \
–hiveconf hive.server2.thrift.port=\
–hiveconf hive.server2.thrift.bind.host=\
–master
- use beeline to test the Thrift JDBC/ODBC server:
./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000
其他特性
Caching
NOTE: CACHE TABLE tbl is now eager by default not lazy. Don’t need to trigger cache materialization manually anymore.1
2
3CACHE [LAZY] TABLE [AS SELECT] ...
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;
动态分区
1 | df.write.partitionBy('year', 'month').saveAsTable(...) |