Spark2.0学习笔记

Catalog API

Spark中的DataSet和Dataframe API支持结构化分析。结构化分析的一个重要的方面是管
理元数据。这些元数据可能是一些临时元数据(比如临时表)、SQLContext上注册的UDF以及
持久化的元数据(比如Hivemeta store或者HCatalog)。
Spark的早期版本是没有标准的API来访问这些元数据的。用户通常使用查询语句(比
如 show tables )来查询这些元数据。这些查询通常需要操作原始的字符串,而且不同元数据
类型的操作也是不一样的。
这种情况在Spark 2.0中得到改变。Spark 2.0中添加了标准的API(称为catalog)来访问
Spark SQL中的元数据。这个API既可以操作Spark SQL,也可以操作Hive元数据。

1
2
3
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder.appName("spark session example").enableHiveSupport().getOrCreate()
val catalog = sparkSession.catalog
  • Querying the databases
    一旦创建好catalog对象之后,可以使用它来查询元数据中的数据库,catalog上的
    API返回的结果全部都是dataset。
    listDatabases 返回元数据中所有的数据库。默认情况下,元数据仅仅只有名为default的数
    据库。如果是Hive元数据,那么它会从Hive元数据中获取所有的数据库。 listDatabases 返
    回的类型是dataset,所以我们可以使用Dataset上的所有操作来查询元数据。

    1
    catalog.listDatabases().select("name").show(false)
  • 使用createTempView注册Dataframe
    在Spark的早期版本,我们使用 registerTempTable 来注册Dataframe。然而在Spark
    2.0中,这个API已经被遗弃了。 registerTempTable 名字很让人误解,因为用户会认为这个函
    数会将Dataframe持久化并且保证这个临时表,但是实际上并不是这样的,所以社区才有意将它
    替换成 createTempView 。 createTempView 的使用方法如下:

    1
    df.createTempView("iteblog")
  • 查询表
    正如我们可以展示出元数据中的所有数据库一样,我们也可以展示出元数据中某个数据库中
    的表。它会展示出Spark SQL中所有注册的临时表。同时可以展示出Hive中默认数据库(也就是
    default)中的表。如下:

    1
    catalog.listTables().select("name").show(false)
  • 判断某个表是否缓存

    1
    2
    catalog.isCached("iteblog")
    df.cache()
  • 删除view
    以使用catalog提供的API来删除view。如果是Spark SQL情况,那么它会删除事先注册好的view;如果是hive情况,那么它会从元数据中删除表

    1
    catalog.dropTempView("iteblog")
  • 查询已经注册的函数
    仅可以使用Catalog API操作表,还可以用它操作UDF。下面代码片段展示
    SparkSession上所有已经注册号的函数,当然也包括了Spark内置的函数。

    1
    catalog.listFunctions().select("name","className","isTemporary").show(100, false)

Catalyst优化器

Spark SQL使用Catalyst优化所有的查询,包括spark sql和dataframe dsl。这个优化器的使用使得查询比直接使用RDD要快很多。Spark在每个版本都会对Catalyst进行优化以便提高查询性能,而不需要用户修改他们的代码。
Catalyst是一个单独的模块类库,这个模块是基于规则的系统。这个框架中的每个规则都是针对某个特定的情况来优化的。比如: ConstantFolding 规则用于移除查询中的常量表达式。
在Spark的早期版本,如果需要添加自定义的优化规则,我们需要修改Spark的源码,这在很多情况下是不太可取的,比如我们仅仅需要优化特定的领域或者场景。所以开发社区想有一种可插拨的方式在Catalyst中添加优化规则。值得高兴的是,Spark 2.0提供了这种实验式的API,我们可以基于这些API添加自定义的优化规则。本文将介绍如何编写自定义的优化规则,并将这些规则添加到Catalyst中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 	// dataframe的优化计划(Optimized plan)
val df = spark.read.option("header", "true").csv("file:///user/iteblog/sales.csv")
val multipliedDF = df.selectExpr("amountPaid * 1")
println(multipliedDF.queryExecution.optimizedPlan.numberedTreeString)
// Spark自动对每一行的 amountPaid 乘上 1.0 。但是这不是最优计划!因为如果是乘以1,最终的结果是一样的。所有我们可以利用这个知识来编写自定义的优化规则,并将这个规则加入到Catalyst中

// 自定义优化计划
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Literal, Multiply}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left, right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1.0 =>
println("optimization of one applied")
left
}
}
// 我们扩展了Rule,Rule是直接操作逻辑计划的。绝大多数的规则都是使用Scala中的模式匹
配。在上面的代码中,我们首先判断优化的操作数(operand)是否是文字(literal),然后判断其值
是否是1.0。为了简便起见,我们限定了1出现的位置,如果1出现在左边,这个优化规则将不起
作用。但是我们可以仿照上面的示例轻松地实现.通过上面的规则,如果右边的值是1,我们将直接返回左边的值。


// 将自定义的优化规则加入到Catalyst中
spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)

// 使用自定义优化规则
val newMultipliedDF = df.selectExpr("amountPaid * 1")
println(newMultipliedDF.queryExecution.optimizedPlan.numberedTreeString)

Spark SQL中Window API

Spark SQL中的window API是从1.4版本开始引入的,以便支持更智能的分组功能。这个功
能对于那些有SQL背景的人来说非常有用;但是在Spark 1.x中,window API一大缺点就是无法
使用时间来创建窗口。时间在诸如金融、电信等领域有着非常重要的角色,基于时间来理解数据
变得至关重要。
不过值得高兴的是,在Spark 2.0中,window API内置也支持time windows!Spark SQL
中的time windows和Spark Streaming中的time windows非常类似。

  • tumbling window
    所有的timewindow API需要一个类型为timestamp的列。

    by```语句中使用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    window方法的第一个参数指定了时间所在的列;第二个参数指定了窗口的持续时间
    (duration),它的单位可以是seconds、minutes、hours、days或者weeks。创建好窗口之
    后,我们可以计算平均值。
    ``` scala
    // 使用了内置的year函数来提取出日期中的年
    val stocks2016 = stocksDF.filter("year(Date)==2016")

    // 对每个星期创建一个窗口,这种类型的窗口通常被称为tumbling window
    val tumblingWindowDS = stocks2016.groupBy(window(stocks2016.col("Date"),"1 week")).agg(avg("Close").as("weekly_average"))

    def printWindow(windowDF:DataFrame, aggCol:String) ={
    windowDF.sort("window.start").
    select("window.start","window.end",s"$aggCol").
    show(truncate = false)
    }
    // 打印window的值
    printWindow(tumblingWindowDS,"weekly_average")

  • 用sliding window(滑动窗口)
    到目前为止,没有相关API来创建带有开始时间的tumbling
    window,但是我们可以通过将窗口时间(window duration)和滑动时间(slide duration)设置成
    一样来创建带有开始时间的tumbling window。

1
2
3
// 4 days 参数就是开始时间的偏移量;前两个参数分别代表窗口时间和滑动时间
val iteblogWindowWithStartTime = stocks2016.groupBy(window(stocks2016.col("Date"),"1
week","1 week", "4 days")).agg(avg("Close").as("weekly_average"))