异步执行和异常处理
Spark使用RDD作为基本单元来构建基于大量数据的算法.
在RDD上你有两个操作:转换 transformation和行动 actions.转换操作会通过前一个RDD构建一个新的RDD.比如map和flatMap.1
2val lines:RDD[String]=sc.textFile("large_file.txt")
val tokens = lines.flatMap(_ split " ")
而行动操作则基于RDD计算结果出来.然后返回给驱动程序或者保存到外部的存储系统(HDFS,HBase)等1
tokens.saveAsTextFile("/some/uotput/file.txt")
最后,关于RDD, 你需要记住: 1
2
3
4
5
6
7
8
9
10
11
12
13
在RDD上处理转换操作的时候, 可能会出错和抛出异常.通常的处理方法就是把转换操作用try-catch包裹起来
``` scala
val lines: RDD[String] = sc.textFile("large_file.txt")
try {
val tokens = lines.flatMap(_ split " ")
// This transformation can throw an exception
.map(s => s(10))
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
}
tokens.saveAsTextFile("/some/output/file.txt")
不幸的是, 转换里的代码直到第一次行动 执行时才会真的执行.也就是说上面的处理异常的代码是完全无用的.我们能做的也就只能时把行动 操作用try-catch包裹起来1
2
3
4
5
6
7
8
9
10
11
12val lines: RDD[String] = sc.textFile("large_file.txt")
val tokens =
lines.flatMap(_ split " ")
.map(s => s(10))
try {
// This try-catch block catch all the exceptions thrown by the
// preceding transformations.
tokens.saveAsTextFile("/some/output/file.txt")
} catch {
case e : StringIndexOutOfBoundsException =>
// Doing something in response of the exception
}
你可以看到,我们丢失了异常处理的位置
使用这种方法,我们会丢失抛出异常的元素.此外,Spark是用来处理大量数据的:我们能确定我们的目的就是仅仅因为一个RDD里一个元素的错误就要阻塞整个执行过程吗?
函数式编程和异常处理
第二种处理方法则是将try-catch移动到转换 操作中. 以上代码变为:
val tokens =
lines.flatMap(_ split “ “)
.map {
s => try {
s(10)
} catch {
case e : StringIndexOutOfBoundsException =>
// What the hell can we return in this case?
}
} // end of map
通过这么做,我们重新获得了位置 特征! 但是,通过这种方法,我们又引入了另一个问题.先前说过: 一个转换操作从旧的RDD里构建了一个新的RDD.转换操作map的偏函数输入的原始类型是String=>Char
为了保留这个特征,我们不得不在case语句中返回一个Char或他的子类型.我们该怎么选择?空的字符?一个特殊的字符? 这些选择显然迟早会造成其他的问题.
对于Spark Transformation算子中的异常处理,直接写try catch会导致需要添加很多额外的代码。1
2
3
4
5
6
7
8
9
10
11
12val data = Seq("123", "12aa")
val dataRDD = sc.parallelize(data)
val intRDD = dataRDD.map {
item =>
try {
item.toInt
} catch {
case e: NumberFormatException =>
0
}
}.collect()
1 | val anotherRDD = dataRDD.map { |