Catalog API
Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管
理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及
持久化的元数据(比如Hivemeta store或者HCatalog)。
Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比
如 show tables )来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据
类型的操作也是不一样的。
这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问
Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。
1 | import org.apache.spark.sql.SparkSession |
Querying the databases
一旦创建好catalog对象之后,可以使用它来查询元数据中的数据库,catalog上的
API返回的结果全部都是dataset。
listDatabases 返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数
据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。 listDatabases 返
回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。1
catalog.listDatabases().select("name").show(false)
使用createTempView注册Dataframe
在Spark的早期版本,我们使用 registerTempTable 来注册Dataframe。然而在Spark
2.0中,这个API已经被遗弃了。 registerTempTable 名字很让人误解,因为用户会认为这个函
数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它
替换成 createTempView 。 createTempView 的使用方法如下:1
df.createTempView("iteblog")
查询表
正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中
的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是
default)中的表。如下:1
catalog.listTables().select("name").show(false)
判断某个表是否缓存
1
2catalog.isCached("iteblog")
df.cache()删除view
以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表1
catalog.dropTempView("iteblog")
查询已经注册的函数
仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示
SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。1
catalog.listFunctions().select("name","className","isTemporary").show(100, false)
Catalyst优化器
Spark SQL使用Catalyst优化所有的查询,包括spark sql和dataframe dsl。这个优化器的使用使得查询比直接使用RDD要快很多。Spark在每个版本都会对Catalyst进行优化以便提高查询性能,而不需要用户修改他们的代码。
Catalyst是一个单独的模块类库,这个模块是基于规则的系统。这个框架中的每个规则都是针对某个特定的情况来优化的。比如: ConstantFolding 规则用于移除查询中的常量表达式。
在Spark的早期版本,如果需要添加自定义的优化规则,我们需要修改Spark的源码,这在很多情况下是不太可取的,比如我们仅仅需要优化特定的领域或者场景。所以开发社区想有一种可插拨的方式在Catalyst中添加优化规则。值得高兴的是,Spark 2.0提供了这种实验式的API,我们可以基于这些API添加自定义的优化规则。本文将介绍如何编写自定义的优化规则,并将这些规则添加到Catalyst中。
1 | // dataframe的优化计划(Optimized plan) |
Spark SQL中Window API
Spark SQL中的window API是从1.4版本开始引入的,以便支持更智能的分组功能。这个功
能对于那些有SQL背景的人来说非常有用;但是在Spark 1.x中,window API一大缺点就是无法
使用时间来创建窗口。时间在诸如金融、电信等领域有着非常重要的角色,基于时间来理解数据
变得至关重要。
不过值得高兴的是,在Spark 2.0中,window API内置也支持time windows!Spark SQL
中的time windows和Spark Streaming中的time windows非常类似。
tumbling window
所有的timewindow API需要一个类型为timestamp的列。by```语句中使用。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17window方法的第一个参数指定了时间所在的列;第二个参数指定了窗口的持续时间
(duration),它的单位可以是seconds、minutes、hours、days或者weeks。创建好窗口之
后,我们可以计算平均值。
``` scala
// 使用了内置的year函数来提取出日期中的年
val stocks2016 = stocksDF.filter("year(Date)==2016")
// 对每个星期创建一个窗口,这种类型的窗口通常被称为tumbling window
val tumblingWindowDS = stocks2016.groupBy(window(stocks2016.col("Date"),"1 week")).agg(avg("Close").as("weekly_average"))
def printWindow(windowDF:DataFrame, aggCol:String) ={
windowDF.sort("window.start").
select("window.start","window.end",s"$aggCol").
show(truncate = false)
}
// 打印window的值
printWindow(tumblingWindowDS,"weekly_average")用sliding window(滑动窗口)
到目前为止,没有相关API来创建带有开始时间的tumbling
window,但是我们可以通过将窗口时间(window duration)和滑动时间(slide duration)设置成
一样来创建带有开始时间的tumbling window。
1 | // 4 days 参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间 |