一 快速入门
Apache Spark快速入门:基本概念和例子(1)
Apache Spark快速入门:基本概念和例子(2)
Spark Streaming整体执行流程http://flykobe.com/index.php/2016/03/22/spark-streaming-execution/
二 应用调试
2.1 Spark应用程序运行的日志存在哪里
Spark应用程序运行的日志存在哪里
Spark日志确切的存放路径和部署模式相关:
- 如果是Spark Standalone模式,我们可以直接在Master UI界面查看应用程序的日志,在默认情况下这些日志是存储在worker节点的work目录下,这个目录可以通过SPARK_WORKER_DIR参数进行配置。
- 如果是Mesos模式,我们同样可以通过Mesos的Master UI界面上看到相关应用程序的日志,这些日志是存储在Mesos slave的work目录下。
- 如果是YARN模式,最简单地收集日志的方式是使用YARN的日志收集工具(
logs -applicationId``` ),这个工具可以收集你应用程序相关的运行日志,但是这个工具是有限制的:**应用程序必须运行完**,因为YARN必须首先聚合这些日志;而且你必须开启日志聚合功能(yarn.log-aggregation-enable,在默认情况下,这个参数是false)。 1
2
3
4``` powershell
./bin/yarn logs -applicationId application_1452073024926_19404
or
Tracking UI --> ApplicationMaster --> Executors菜单 --> Logs
2.2 通过可视化途径理解你的Spark应用程序
通过可视化途径理解你的Spark应用程序1
2Tracking UI --> ApplicationMaster -->Jobs菜单/Stages菜单
分析shuffle的时间
2.3 Spark作业代码(源码)IDE远程调试
2.4 不要直接继承scala.App来提交代码
如果你继承了App trait,那么里面的变量被当作了单例类的field了;而如果是main方法,则当作是局部变量了。而且trait App是继承了DelayedInit,所以里面的变量只有用到了main方法的时候才会被初始化。
Spark程序编写:继承App的问题
三 REST API
Spark 1.4中REST API介绍
目前这个API支持正在运行的应用程序,也支持历史服务器。在请求URL都有/api/v1。比如,对于历史服务器来说,我们可以通过http://:18080/api/v1来获取一些信息;对于正在运行的Spark应用程序,我们可以通过http://www.iteblog.com:4040/api/v1来获取一些信息。
四 文件操作
4.1 Spark多文件输出(MultipleOutputFormat)
Spark多文件输出(MultipleOutputFormat)
4.2 Json
1 | val sqlContext = new org.apache.spark.sql.SQLContext(sc) |
1 | CREATE TEMPORARY TABLE people |
五 shuffle
Spark shuffle:hash和sort性能对比
随着mapper数量或者Reduce数量的增加,基于hash的shuffle实现的表现比基于sort的shuffle实现的表现越来越糟糕。基于这个事实,在Spark 1.2版本,默认的shuffle将选用基于sort的。在MLlib下,基于sort的Shuffle并不一定比基于hash的Shuffle表现好,所以程序选择哪个Shuffle实现是需要考虑到具体的场景,如果内置的Shuffle实现不能满足自己的需求,我们完全可以自己实现一个Shuffle。用户自定义的Shuffle必须继承ShuffleManager类,重写里面的一些方法。
六 partition
Spark分区器HashPartitioner和RangePartitioner代码详解
Spark自定义分区(Partitioner)
Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:1
2
3
4
5
6
7
8
9
10package org.apache.spark
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
- def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
- def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
- equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
七 序列化
Spark Task序列化代码分析
在Spark中自定义Kryo序列化输入输出API
require-kryo-serialization-in-spark-scala
解决序列化问题:
- 使用lazy引用(Lazy Reference)来实现
lazy val pool = new JedisPool(new GenericObjectPoolConfig(), redisHost, redisPort, redisTimeout) - 把对需要序列化对象的管理放在操作DStream的Output操作范围之内,因为我们知道它是在特定的Executor中进行初始化的,使用一个单例的对象来管理
八 RDD
Spark RDD API扩展开发(1): 自定义函数
Spark RDD API扩展开发(2):自定义RDD
九 kafka
Spark Streaming和Kafka整合开发指南(一)
Spark Streaming和Kafka整合开发指南(二)
Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现
Spark Streaming和Kafka整合是如何保证数据零丢失
Kafka+Spark Streaming+Redis实时系统实践