谈谈Spark的计算本地性

转载:http://coolplayer.net/2017/05/02/%E8%B0%88%E8%B0%88spark-%E7%9A%84%E8%AE%A1%E7%AE%97%E6%9C%AC%E5%9C%B0%E6%80%A7

Spark 是计算追着数据走, Storm 是数据追着计算走, 所以如果数据量比较小,要求延迟比较小, 就适合storm, 但是如果数据量比较大, 这个时候如果传输数据, 就会碰到很大的带宽占用和性能下降, 这个时候就比较适合让计算去找数据.

但是在计算找数据的过程中, 是怎么让计算找到数据呢, 这个就是这篇文章谈的, spark 的计算本地性

不同的 Locality Level

  • PROCESS_LOCAL: 数据和 task 在同一个executor jvm 中,最好的就是这种 locality。
  • NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取
  • NO_PREF: 数据从哪里访问都一样快,不需要位置优先
  • RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢
  • ANY: 数据在非同一机架的网络上,速度最慢

Task 自己想在哪里执行

我们来看下 task 自己想在哪里执行, 这要根据 task 处理的数据是否缓存, task 的数据所在的 host 和 rack来判断, 在 DAGScheduler 中, 每个分区每个 stage 会成为一个task, 每个 task 会根据自己分区的数据的情况,进行判断自己的 本地性Level,

getPreferredLocs -> getPreferredLocsInternal

函数中, 会先判断是否rdd 本分区的数据是否已经缓存在 blockManager, 如果已经缓存, 获取到数据所在的 host 和 executorId, 然后设置本 task 的数据本地性 Level 为 PROCESS_LOCAL, 偏好某个 host 上的某个 executor 去执行,

如果没有缓存, 那么就不能是PROCESS_LOCAL , 最多也就是个 NODE_LOCAL,会根据不同的RDD类型,来调用具体的 getPreferredLocations 来判断数据本地性 Level, 和数据本地性偏好

假如这里是HadoopRDD, 那么每个 task 处理的数据就是一个 HadoopPartition, 其实代表 hdfs 中的一份数据 InputSplit, 它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表, 分割位置中就能获取到 数据所在的 host 和 rack,

如果数据源头是 kafka, 那么每个 task 处理的数据就是 KafkaRDDPartition, 其实对应每一个topic的每一个partition, preferredHosts 中记录着每个 topic 中每个 partition 所在的 host, 就直接可以当做 偏好的 host, 如果kafka中broker和Spark在同一个集群中,此时getPreferredLocations获取本地性就可以极大提高效率,因为没有了数据网络传输的成本。

以上两种都偏好某个 host 去执行

这里需要注意的是,这里找的数据源头是 rdd的顺着窄依赖, 往上找父依赖, 直到找到第一个窄依赖, 也就找到了数据读取源头, 来决定数据本地性

如果读取的是 shuffle 的数据, 就不用考虑那么多了, 因为shuffle中的read task 是需要去所有的write task的disk上拉取数据的。

怎么能最大程度的满足 task 的本地性

我们都知道, 数据传输对内网带宽和性能有极大的损耗,所以要千方百计的最大程度的满足 更高级别的本地性,从优到差排, PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL

所以spark 调度的总体原则就是总是尝试以最高的 locality level 去启动task, 如果对应需要是用到的 executor 正在使用中(跑别的task),满足不了, 就等一会(等待时间是有spark.locality.wait.process或spark.locality.wait.node或spark.locality.wait.rack来控制的), 看看过一会这个忙线的host 或者 executor是不是解脱了, 如果已经空闲了,我就可以把 task 放在它最期望的 host 或者 executor 上去运行了, 这里赌的就是一般来说,task 执行耗时相对于网络传输/文件IO 要小得多,调度器多等待1 2秒可能就可以以更好的本地性执行 task,避免了更耗时的网络传输或文件IO, 也是极棒的。

Spark的延迟调度

我们来看下 spark 的延迟调度的策略,

有时候 task 自己偏好某个 executor 中,

可以看下面的图直观理解一下

所以spark 调度的总体原则就是总是尝试以最高的 locality level 去启动task,
我举个例子, 假如 一个 task 要处理的数据,在上一个 stage 中缓存下来了, 这个 task 期望的 就是以 PROCESS_LOCAL 来运行, 这个时候缓存数据的executor 不巧正在执行 其他的task, 那么我就等一会, 等多长时间呢, spark.locality.wait.process这么长时间, 如果时间超了, executor 还是没有空闲下来, 那么我没有办法, 我就以NODE_LOCAL 来运行 task, 这个时候我想到 同一台机器上其他 executor 上跨jvm 去拉取数据, 如果同一台机器上有其他空闲的 executor 可以满足, 就这么干, 如果没有, 等待 spark.locality.wait.node 时间, 还没有就以更低的 Locality Level 去执行这个 task。