Spark函数讲解

cogroup

将多个RDD中同一个Key对应的Value组合到一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def cogroup[W1, W2, W3](other1: RDD[(K, W1)], 
other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) :
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int) :
RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)], other3: RDD[(K, W3)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)],
numPartitions: Int)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) :
RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

cogroup函数原型一共有九个(真多)!最多可以组合四个RDD。

1
2
3
4
5
6
7
8
9
10
val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very"), (3, "good")))
val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
val result = data1.cogroup(data2, data3)
result.collect.foreach(println)

------------------------------------
(2,(CompactBuffer(bbs),CompactBuffer(iteblog),CompactBuffer(com)))
(1,(CompactBuffer(www),CompactBuffer(iteblog),CompactBuffer(com)))
(3,(CompactBuffer(),CompactBuffer(very, good),CompactBuffer(good)))

combineByKey

使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。

函数原型

1
2
3
4
5
6
7
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
Boolean = true, serializer: Serializer = null): RDD[(K, C)]

  第一个和第二个函数都是基于第三个函数实现的,使用的是HashPartitioner,Serializer为null。而第三个函数我们可以指定分区,如果需要使用Serializer的话也可以指定。combineByKey函数比较重要,我们熟悉地诸如aggregateByKey、foldByKey、reduceByKey等函数都是基于该函数实现的。默认情况会在Map端进行组合操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"),(2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
val result = data.combineByKey(List(_), (x: List [String], y: String) => y :: x, (x: List[String],y:List[String]) => x ::: y)
result.collect.foreach(println)

(2,List(com, iteblog, bbs))
(1,List(com, iteblog, www))
(3,List(good))

val data2 = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3)))
val result2 = data2.combineByKey(x => x,(x: Int, y:Int) => x + y, (x:Int, y: Int) => x + y)
result2.collect.foreach(println)

(iteblog,4)
(bbs,1)

coalesce

对RDD中的分区重新进行合并。

函数原型

1
2
def coalesce(numPartitions: Int, shuffle: Boolean = false)
    (implicit ord: Ordering[T] = null): RDD[T]

返回一个新的RDD,且该RDD的分区个数等于numPartitions个数。如果shuffle设置为true,则会进行shuffle。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var data = sc.parallelize(List(1,2,3,4))
val result = data.coalesce(2, false)
println(result.partitions.length)
println(result.toDebugString)
val result1 = data.coalesce(2, true)
println(result1.toDebugString)

2
(2) CoalescedRDD[1] at coalesce at CoalesceSuite.scala:16 []
| ParallelCollectionRDD[0] at parallelize at CoalesceSuite.scala:15 []

(2) MapPartitionsRDD[5] at coalesce at CoalesceSuite.scala:19 []
| CoalescedRDD[4] at coalesce at CoalesceSuite.scala:19 []
| ShuffledRDD[3] at coalesce at CoalesceSuite.scala:19 []
+-(2) MapPartitionsRDD[2] at coalesce at CoalesceSuite.scala:19 []
| ParallelCollectionRDD[0] at parallelize at CoalesceSuite.scala:15 []

从上面可以看出shuffle为false的时候并不进行shuffle操作;而为true的时候会进行shuffle操作。RDD.partitions.length可以获取相关RDD的分区数。

checkpoint

为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移出。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发

1
def checkpoint()

scala>val data = sc.parallelize(1 to 100000 , 15)
scala> sc.setCheckpointDir(“/app/ecom/cm/ods/tmp/sunke/spark/checkpoint”)
scala> data.checkpoint
scala> data.count
res3: Long = 100000

结果:

[work@yq01-cm-m32-201502nova228.yq01.baidu.com ~]$ hadoop fs -ls /app/ecom/cm/ods/tmp/sunke/spark/checkpoint
16/05/18 11:27:20 INFO common.UpdateService: ZkstatusUpdater to yq01-wutai-hdfs.dmop.baidu.com:54310 started
Found 1 items
drwxr-xr-x 3 ods ods 0 2016-05-18 11:26 /app/ecom/cm/ods/tmp/sunke/spark/checkpoint/590a5e9b-26de-416f-aecb-22ea5e29d893
执行完count之后,会在checkpoint目录下产生出多个(数量和你分区个数有关)二进制的文件。

cartesian

从名字就可以看出这是笛卡儿的意思,就是对给的两个RDD进行笛卡儿计算。官方文档说明:Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.

1
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]

该函数返回的是Pair类型的RDD,计算结果是当前RDD和other RDD中每个元素进行笛卡儿计算的结果。最后返回的是CartesianRDD。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val a = sc.parallelize(List(1,2,3))
val b = sc.parallelize(List(4,5,6))
val result = a.cartesian(b)
result.collect.foreach(println)

(1,4)
(1,5)
(1,6)
(2,4)
(3,4)
(2,5)
(2,6)
(3,5)
(3,6)

笛卡儿计算是很恐怖的,它会迅速消耗大量的内存,所以在使用这个函数的时候请小心。

cache

使用MEMORY_ONLY储存级别对RDD进行缓存,其内部实现是调用persist()函数的。官方文档定义:Persist this RDD with the default storage level (MEMORY_ONLY).

1
def cache() : this.type

scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at :12
scala> data.getStorageLevel
res65: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, false, false, false, 1)
scala> data.cache
res66: org.apache.spark.rdd.RDD[Int] =
  ParallelCollectionRDD[44] at parallelize at :12
scala> data.getStorageLevel
res67: org.apache.spark.storage.StorageLevel =
  StorageLevel(false, true, false, true, 1)
我们先是定义了一个RDD,然后通过getStorageLevel函数得到该RDD的默认存储级别,这里是NONE。然后我们调用cache函数,将RDD的存储级别改成了MEMORY_ONLY(看StorageLevel的第二个参数)。关于StorageLevel的其他的几种存储级别介绍请参照StorageLevel类进行了解。

aggregate

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U’s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
  aggregate函数将每个分区里面的元素进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

1
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def seqOP(a: Int, b: Int): Int = {
println("seqOp: " + a + "\t" + b)
a + b
}

def combOp(a: Int, b: Int): Int = {
println("combOp: " + a + "\t" + b)
a + b
}

val z1 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
val result1 = z1.aggregate(3)(seqOP, combOp)
println("result1: "+result1)

计算过程为:
partition内部聚合:
3(初始值)+1+2+3=9
3(初始值)+4+5+6=18
combine:
3(初始值)+9+18

result1: 30

1、reduce函数和combine函数必须满足交换律(commutative)和结合律(associative)
2、从aggregate 函数的定义可知,combine函数的输出类型必须和输入的类型一致

aggregateByKey

该函数和aggregate类似,但操作的RDD是Pair类型的。
Aggregate the values of each key, using given combine functions and a neutral “zero value”. This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。

1
2
3
4
5
6
7
8
9
10
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
第一个aggregateByKey函数我们可以自定义Partitioner。除了这个参数之外,其函数声明和aggregate很类似;其他的aggregateByKey函数实现最终都是调用这个。

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
第二个aggregateByKey函数可以设置分区的个数(numPartitions),最终用的是HashPartitioner
def aggregateByKey[U: ClassTag](zeroValue: U)
    (seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
最后一个aggregateByKey实现先会判断当前RDD是否定义了分区函数,如果定义了则用当前RDD的分区;如果当前RDD并未定义分区 ,则使用HashPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3)))

def seq(a:Int, b:Int) : Int ={
println("seq: " + a + "\t " + b)
a + b
}

def comb(a:Int, b:Int) : Int ={
println("comb: " + a + "\t " + b)
a + b
}

val result = data.aggregateByKey(1)(seq, comb)
result.collect.foreach(println)
计算过程:
1(初始值)+3+2 1(初始值)+4
1(初始值)+3
(2,4)
(1,11)

aggregateByKey和aggregate结果有点不一样。如果用aggregate函数对含有3、2、4三个元素的RDD进行计算,初始值为1的时候,计算的结果应该是10,而这里是9,这是因为aggregate函数中的初始值需要和reduce函数以及combine函数结合计算,而aggregateByKey中的初始值只需要和reduce函数计算,不需要和combine函数结合计算,所以导致结果有点不一样。

sortBy和sortByKey

在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

sortBy

sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:
该函数最多可以传三个参数:
  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
  第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
  第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size。

1
2
3
4
5
6
7
8
9
10
11
/**
* Return this RDD sorted by the given key function.
*/
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.size)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values

从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的元素,也就变成了Key-Value类型的RDD了,它的实现如下:

1
2
3
4
5
6
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: T => K): RDD[(K, T)] = {
map(x => (f(x), x))
}

Demo:

1
2
3
4
val data = List(3,1,90,3,5,12)
val rdd = sc.parallelize(data)
val result = rdd.sortBy(x => x, false)
result.collect.foreach(println)

sortByKey函数

sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下:

1
2
3
4
5
6
7
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
: RDD[(K, V)] =
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。

1
2
3
4
5
6
7
8
9
10
11
val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
val b = sc.parallelize(List(3, 1, 9, 12, 4))
val c = b.zip(a)
c.sortByKey().collect

// 修改默认的排序规则
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) =
a.toString.compare(b.toString)
}
c.sortByKey().collect

collectAsMap

功能和collect函数类似。该函数用于Pair RDD,最终返回Map类型的结果。官方文档说明:
Return the key-value pairs in this RDD to the master as a Map.
Warning: this doesn’t return a multimap (so if you have multiple values to the same key, only one value per key is preserved in the map returned

1
def collectAsMap(): Map[K, V]

1
2
3
val data = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good")))
data.collectAsMap.foreach(print)
(2,com)(1,com)(3,good)

从结果我们可以看出,如果RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value。