Spark如何处理异常

异步执行和异常处理

Spark使用RDD作为基本单元来构建基于大量数据的算法.

在RDD上你有两个操作:转换 transformation和行动 actions.转换操作会通过前一个RDD构建一个新的RDD.比如map和flatMap.

1
2
val lines:RDD[String]=sc.textFile("large_file.txt")
val tokens = lines.flatMap(_ split " ")

而行动操作则基于RDD计算结果出来.然后返回给驱动程序或者保存到外部的存储系统(HDFS,HBase)等

1
tokens.saveAsTextFile("/some/uotput/file.txt")

最后,关于RDD, 你需要记住:

you can define new RDDs any time, Spark computes them only in a lazy fashion —that is, the first time they are used in an action. ```
1
2
3
4
5
6
7
8
9
10
11
12
13

在RDD上处理转换操作的时候, 可能会出错和抛出异常.通常的处理方法就是把转换操作用try-catch包裹起来
``` scala
val lines: RDD[String] = sc.textFile("large_file.txt")
try {
val tokens = lines.flatMap(_ split " ")
// This transformation can throw an exception
.map(s => s(10))
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
}
tokens.saveAsTextFile("/some/output/file.txt")

不幸的是, 转换里的代码直到第一次行动 执行时才会真的执行.也就是说上面的处理异常的代码是完全无用的.我们能做的也就只能时把行动 操作用try-catch包裹起来

1
2
3
4
5
6
7
8
9
10
11
12
val lines: RDD[String] = sc.textFile("large_file.txt")
val tokens =
lines.flatMap(_ split " ")
.map(s => s(10))
try {
// This try-catch block catch all the exceptions thrown by the
// preceding transformations.
tokens.saveAsTextFile("/some/output/file.txt")
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
}

你可以看到,我们丢失了异常处理的位置

使用这种方法,我们会丢失抛出异常的元素.此外,Spark是用来处理大量数据的:我们能确定我们的目的就是仅仅因为一个RDD里一个元素的错误就要阻塞整个执行过程吗?

函数式编程和异常处理

第二种处理方法则是将try-catch移动到转换 操作中. 以上代码变为:

val tokens =
lines.flatMap(_ split “ “)
.map {
s => try {
s(10)
} catch {
case e : StringIndexOutOfBoundsException =>
// What the hell can we return in this case?
}
} // end of map
通过这么做,我们重新获得了位置 特征! 但是,通过这种方法,我们又引入了另一个问题.先前说过: 一个转换操作从旧的RDD里构建了一个新的RDD.转换操作map的偏函数输入的原始类型是String=>Char

为了保留这个特征,我们不得不在case语句中返回一个Char或他的子类型.我们该怎么选择?空的字符?一个特殊的字符? 这些选择显然迟早会造成其他的问题.
对于Spark Transformation算子中的异常处理,直接写try catch会导致需要添加很多额外的代码。

1
2
3
4
5
6
7
8
9
10
11
12
val data = Seq("123", "12aa")
val dataRDD = sc.parallelize(data)

val intRDD = dataRDD.map {
item =>
try {
item.toInt
} catch {
case e: NumberFormatException =>
0
}
}.collect()

1
2
3
4
5
val anotherRDD = dataRDD.map {
item =>
Try(item.toInt)
}.filter(_.isSuccess).collect()
anotherRDD.foreach(println)