Spark 资料整理

一 快速入门

Apache Spark快速入门:基本概念和例子(1)
Apache Spark快速入门:基本概念和例子(2)
Spark Streaming整体执行流程http://flykobe.com/index.php/2016/03/22/spark-streaming-execution/

二 应用调试

2.1 Spark应用程序运行的日志存在哪里

Spark应用程序运行的日志存在哪里
Spark日志确切的存放路径和部署模式相关:

  1. 如果是Spark Standalone模式,我们可以直接在Master UI界面查看应用程序的日志,在默认情况下这些日志是存储在worker节点的work目录下,这个目录可以通过SPARK_WORKER_DIR参数进行配置。
  2. 如果是Mesos模式,我们同样可以通过Mesos的Master UI界面上看到相关应用程序的日志,这些日志是存储在Mesos slave的work目录下。
  3. 如果是YARN模式,最简单地收集日志的方式是使用YARN的日志收集工具(
    logs -applicationId``` ),这个工具可以收集你应用程序相关的运行日志,但是这个工具是有限制的:**应用程序必须运行完**,因为YARN必须首先聚合这些日志;而且你必须开启日志聚合功能(yarn.log-aggregation-enable,在默认情况下,这个参数是false)。
    1
    2
    3
    4
    ``` powershell
    ./bin/yarn logs -applicationId application_1452073024926_19404
    or
    Tracking UI --> ApplicationMaster --> Executors菜单 --> Logs

2.2 通过可视化途径理解你的Spark应用程序

通过可视化途径理解你的Spark应用程序

1
2
Tracking UI --> ApplicationMaster -->Jobs菜单/Stages菜单
分析shuffle的时间

2.3 Spark作业代码(源码)IDE远程调试

Spark作业代码(源码)IDE远程调试

2.4 不要直接继承scala.App来提交代码

如果你继承了App trait,那么里面的变量被当作了单例类的field了;而如果是main方法,则当作是局部变量了。而且trait App是继承了DelayedInit,所以里面的变量只有用到了main方法的时候才会被初始化。
Spark程序编写:继承App的问题

三 REST API

Spark 1.4中REST API介绍
目前这个API支持正在运行的应用程序,也支持历史服务器。在请求URL都有/api/v1。比如,对于历史服务器来说,我们可以通过http://:18080/api/v1来获取一些信息;对于正在运行的Spark应用程序,我们可以通过http://www.iteblog.com:4040/api/v1来获取一些信息。

四 文件操作

4.1 Spark多文件输出(MultipleOutputFormat)

Spark多文件输出(MultipleOutputFormat)

4.2 Json

1
2
3
4
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sqlContext.jsonFile("[the path to file people]")
people.registerTempTable("people")
people.printSchema()
1
2
3
CREATE TEMPORARY TABLE people
USING org.apache.spark.sql.json
OPTIONS (path '[the path to the JSON dataset]')

Spark SQL中对Json支持的详细介绍

五 shuffle

Spark shuffle:hash和sort性能对比
随着mapper数量或者Reduce数量的增加,基于hash的shuffle实现的表现比基于sort的shuffle实现的表现越来越糟糕。基于这个事实,在Spark 1.2版本,默认的shuffle将选用基于sort的。在MLlib下,基于sort的Shuffle并不一定比基于hash的Shuffle表现好,所以程序选择哪个Shuffle实现是需要考虑到具体的场景,如果内置的Shuffle实现不能满足自己的需求,我们完全可以自己实现一个Shuffle。用户自定义的Shuffle必须继承ShuffleManager类,重写里面的一些方法。

六 partition

Spark分区器HashPartitioner和RangePartitioner代码详解
Spark自定义分区(Partitioner)
Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

1
2
3
4
5
6
7
8
9
10
package org.apache.spark

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

  • def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
  • def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
  • equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。

七 序列化

Spark Task序列化代码分析
在Spark中自定义Kryo序列化输入输出API
require-kryo-serialization-in-spark-scala

解决序列化问题:

  1. 使用lazy引用(Lazy Reference)来实现
    lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
  2. 把对需要序列化对象的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理

八 RDD

Spark RDD API扩展开发(1): 自定义函数
Spark RDD API扩展开发(2):自定义RDD

九 kafka

Spark Streaming和Kafka整合开发指南(一)
Spark Streaming和Kafka整合开发指南(二)
Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现
Spark Streaming和Kafka整合是如何保证数据零丢失
Kafka+Spark Streaming+Redis实时系统实践