Spark动态资源分配

1. Yarn模式下资源分配

在Yarn模式下,ResourceManager(以后简称RM)会为每一个应用程序分配固定数量的Executors(默认是2个,可通过–num-executors设置),且为每个Executor分配固定的CPU和内存(默认1个CPU和512M内存,可通过-executor-memory and和–executor-cores设置 )不同应用程序的Executors之间是互相独立的,每个应用程序的Executors只负责自己的应用程序的任务运行和数据存储。每个Executor是一个独立的JVM进程,JVM之间内存是无法共享的。

2. 动态资源分配的简介

动态资源调度是在Spark1.2之后才开始加进去的,

①原理:当一个应用程序不需要使用资源,且后续没有资源申请的时候,它就将现有的资源交回给集群,这样集群便可以将资源分配给其他应用程序来使用。当该应用程序再次需要资源的时候就再去向集群申请。

②优点:当集群中有很多应用程序的时候可以提高集群的资源利用率,比如集群中有一个应用程序作为服务一直在执行,集群给他分配了12个CPU和8G内存, 默认情况下,该应用会一直占用12个CPU和8G内存的资源,直到程序结束,如果目前该应用程序运行不需要这么多的资源,资源处于空闲状态的时候,集群也 不会把该资源分配给其他的应用程序,从而造成的资源的浪费。所以使用动态资源分配可以提高资源利用率。

③缺点:动态资源分配的调度算法如果不合理可能造成应用程序的高延迟,一种情况是某个应用程序刚把资源交回给集群,下一刻就又需要使用资源,然后在去向集群申请。

3. 动态资源分配的配置

①修改配置文件spark-defaults.conf,添加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
spark.dynamicAllocation.enabled   true   //开启动态资源分配
spark.shuffle.service.enabled true //启用External shuffle Service服务
//shuffle.service是必须要启动的


spark.dynamicAllocation.minExecutors 1 //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数

spark.shuffle.service.port 7337 //Shuffle Service服务端口,必须和yarn-site中的一致

spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s

②在$SPARK_HOME/lib目录下找到spark--yarn-shuffle.jar文件,将该jar文件添加到NodeManager节点的classpath环境中。

③修改yarn-site.xml文件,设置如下参数

1
2
3
4
5
6
7
8
9
10
11
12
<?xml version="1.0"?>
<configuration>
...........
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
</configuration>

④重新启动NodeManager。保证每个NodeManager节点读能找到spark-1.3.0-yarn-shuffle.jar该文件,否则NodeManager无法启动。我就是因为这个原因,导致花了很长时间才调试通。

4. 资源分配策略

资源的动态分配要求Spark在资源空闲的时候放弃Executors资源,在需要运行任务的时候申请Executors资源。但是很难预测一个即将放弃 Executors资源的应用程序下一刻是否会运行任务,或者一个刚申请的Executor在下一刻是否会处于空闲状态。

①请求策略

资源动态分配模式下,如果某个Spark应用程序有某些tasks处于等待运行状态时,该应用程序就会向集群申请更多的Executors资源,这意味着 集群为该应用程序分配的Executors资源不足以满足它提交的正在运行的,且还没有运行结束的tasks。Spark请求Executors采用的是 轮询的方式,当有tasks处于等待状态spark.dynamicAllocation.schedulerBacklogTimeout秒后会触发一次Executors请求,如果之后tasks一直处于等待状态,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒会触发一次Executors请求,每次轮询中申请的Executors个数以指数形式增长,比如第一个轮询申请1个Executor,之后会依次申请2,4,8…个Executors。

需要关注两点,一是谨慎申请Executors资源,以免申请到的Executors资源只有一小部分处于运行状态,其他的大部分处于空闲状态。二是充分利用申请到的Executors资源,以防止后续有大量Executors资源申请的情况出现。

②删除策略

当某个Spark应用程序的某个Executor处于空闲状态spark.dynamicAllocation.executorIdleTimeout秒后就会删除该Executor,注意,如果有task处于等待状态的时候,该Executor就不会处于空闲状态。

5. 如何有效的删除Executor

在默认情况下,只有在Executor失败或者它所属的应用程序退出的情况下Executor才会结束,这两种情况下Executor的状态都不需要保 存,可以直接删除。在动态资源分配情况下,在Executor退出之前应用程序还在执行,所以在Executor的被删除之前需要将它的状态需要保存下 来,等下次执行Executor的时候可以重用。对于Shuffle保存Executor的状态尤其重要,在Shuffle阶段Executor会将 Map阶段的输出结果保存到本地文件系统中,然后在Reduce阶段其他的Executors会从该文件系统中读取Map阶段的输出结果。动态资源分配模 式下,一个Executor有可能在它的Shuffle还没有完成之前就将该Executor删除了,这就导致Shuffle阶段Executor保存的 文件在下次重新启动Executor的时候需要进行不必要的重新计算。

解决方法是使用额外的Shuffle Service,这个特性是在Spark1.2之后加进来的,Shuffle Service是运行在集群每个节点上的服务,它与Spark应用程序及其Executors是相互独立的,如果Shuffle Service在运行,Spark的Executors就会从Shuffle Service获取Shuffle阶段保存的文件,而不是去每个节点获取。这就意味着一个Executors被删除之后,它在Shuffle阶段的状态信 息还一直运行在Shuffle Service中。

6. 动态资源分配相关配置

红色参数比较重要,在使用动态资源分配的时候最好自行设置。

属性名 默认值 说明
spark.dynamicAllocation.enabled false 是否启动动态资源分配,默认不启动,目前只支持Yarn模式,Standalone和Mesoso暂不支持
spark.dynamicAllocation.executorIdleTimeout 600 如果某个Executor在该时间段内处于空闲状态,那么它将被删除,其所占用的资源将返还给集群
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 动态资源分配启动时初始化的Executor的个数,默认情况下与spark.dynamicAllocation.minExecutors配置的值相同,如果启动动态资源分配,那么该值必须大于0,否则应用程序会因为没有资源而无法运行
spark.dynamicAllocation.maxExecutors Integer.MAX_VALUE 动态资源分配的Executors的最大个数
spark.dynamicAllocation.minExecutors 0 分配的最小Executors个数,默认为0
spark.dynamicAllocation.schedulerBacklogTimeout 5 如果有tasks处于等待状态,多少秒后开始向集群申请Executors资源,默认是5秒
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 如果申请的Executors资源无法满足当前Tasks的需求,那么采用轮询的方式向集群申请Executors资源,该值为轮询的时间间隔,默认值与schedulerBacklogTimeout相同