1. Yarn模式下资源分配
在Yarn模式下,ResourceManager(以后简称RM)会为每一个应用程序分配固定数量的Executors(默认是2个,可通过–num-executors设置),且为每个Executor分配固定的CPU和内存(默认1个CPU和512M内存,可通过-executor-memory and和–executor-cores设置 )不同应用程序的Executors之间是互相独立的,每个应用程序的Executors只负责自己的应用程序的任务运行和数据存储。每个Executor是一个独立的JVM进程,JVM之间内存是无法共享的。
2. 动态资源分配的简介
动态资源调度是在Spark1.2之后才开始加进去的,
①原理:当一个应用程序不需要使用资源,且后续没有资源申请的时候,它就将现有的资源交回给集群,这样集群便可以将资源分配给其他应用程序来使用。当该应用程序再次需要资源的时候就再去向集群申请。
②优点:当集群中有很多应用程序的时候可以提高集群的资源利用率,比如集群中有一个应用程序作为服务一直在执行,集群给他分配了12个CPU和8G内存, 默认情况下,该应用会一直占用12个CPU和8G内存的资源,直到程序结束,如果目前该应用程序运行不需要这么多的资源,资源处于空闲状态的时候,集群也 不会把该资源分配给其他的应用程序,从而造成的资源的浪费。所以使用动态资源分配可以提高资源利用率。
③缺点:动态资源分配的调度算法如果不合理可能造成应用程序的高延迟,一种情况是某个应用程序刚把资源交回给集群,下一刻就又需要使用资源,然后在去向集群申请。
3. 动态资源分配的配置
①修改配置文件spark-defaults.conf,添加如下内容1
2
3
4
5
6
7
8
9
10
11
12spark.dynamicAllocation.enabled true //开启动态资源分配
spark.shuffle.service.enabled true //启用External shuffle Service服务
//shuffle.service是必须要启动的
spark.dynamicAllocation.minExecutors 1 //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数
spark.shuffle.service.port 7337 //Shuffle Service服务端口,必须和yarn-site中的一致
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
②在$SPARK_HOME/lib目录下找到spark-
③修改yarn-site.xml文件,设置如下参数
1 | <?xml version="1.0"?> |
④重新启动NodeManager。保证每个NodeManager节点读能找到spark-1.3.0-yarn-shuffle.jar该文件,否则NodeManager无法启动。我就是因为这个原因,导致花了很长时间才调试通。
4. 资源分配策略
资源的动态分配要求Spark在资源空闲的时候放弃Executors资源,在需要运行任务的时候申请Executors资源。但是很难预测一个即将放弃 Executors资源的应用程序下一刻是否会运行任务,或者一个刚申请的Executor在下一刻是否会处于空闲状态。
①请求策略
资源动态分配模式下,如果某个Spark应用程序有某些tasks处于等待运行状态时,该应用程序就会向集群申请更多的Executors资源,这意味着 集群为该应用程序分配的Executors资源不足以满足它提交的正在运行的,且还没有运行结束的tasks。Spark请求Executors采用的是 轮询的方式,当有tasks处于等待状态spark.dynamicAllocation.schedulerBacklogTimeout
秒后会触发一次Executors请求,如果之后tasks一直处于等待状态,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒会触发一次Executors请求,每次轮询中申请的Executors个数以指数形式增长,比如第一个轮询申请1个Executor,之后会依次申请2,4,8…个Executors。
需要关注两点,一是谨慎申请Executors资源,以免申请到的Executors资源只有一小部分处于运行状态,其他的大部分处于空闲状态。二是充分利用申请到的Executors资源,以防止后续有大量Executors资源申请的情况出现。
②删除策略
当某个Spark应用程序的某个Executor处于空闲状态spark.dynamicAllocation.executorIdleTimeout
秒后就会删除该Executor,注意,如果有task处于等待状态的时候,该Executor就不会处于空闲状态。
5. 如何有效的删除Executor
在默认情况下,只有在Executor失败或者它所属的应用程序退出的情况下Executor才会结束,这两种情况下Executor的状态都不需要保 存,可以直接删除。在动态资源分配情况下,在Executor退出之前应用程序还在执行,所以在Executor的被删除之前需要将它的状态需要保存下 来,等下次执行Executor的时候可以重用。对于Shuffle保存Executor的状态尤其重要,在Shuffle阶段Executor会将 Map阶段的输出结果保存到本地文件系统中,然后在Reduce阶段其他的Executors会从该文件系统中读取Map阶段的输出结果。动态资源分配模 式下,一个Executor有可能在它的Shuffle还没有完成之前就将该Executor删除了,这就导致Shuffle阶段Executor保存的 文件在下次重新启动Executor的时候需要进行不必要的重新计算。
解决方法是使用额外的Shuffle Service,这个特性是在Spark1.2之后加进来的,Shuffle Service是运行在集群每个节点上的服务,它与Spark应用程序及其Executors是相互独立的,如果Shuffle Service在运行,Spark的Executors就会从Shuffle Service获取Shuffle阶段保存的文件,而不是去每个节点获取。这就意味着一个Executors被删除之后,它在Shuffle阶段的状态信 息还一直运行在Shuffle Service中。
6. 动态资源分配相关配置
红色参数比较重要,在使用动态资源分配的时候最好自行设置。
属性名 | 默认值 | 说明 |
---|---|---|
spark.dynamicAllocation.enabled |
false | 是否启动动态资源分配,默认不启动,目前只支持Yarn模式,Standalone和Mesoso暂不支持 |
spark.dynamicAllocation.executorIdleTimeout | 600 | 如果某个Executor在该时间段内处于空闲状态,那么它将被删除,其所占用的资源将返还给集群 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors | 动态资源分配启动时初始化的Executor的个数,默认情况下与spark.dynamicAllocation.minExecutors配置的值相同,如果启动动态资源分配,那么该值必须大于0,否则应用程序会因为没有资源而无法运行 |
spark.dynamicAllocation.maxExecutors |
Integer.MAX_VALUE | 动态资源分配的Executors的最大个数 |
spark.dynamicAllocation.minExecutors |
0 | 分配的最小Executors个数,默认为0 |
spark.dynamicAllocation.schedulerBacklogTimeout | 5 | 如果有tasks处于等待状态,多少秒后开始向集群申请Executors资源,默认是5秒 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | schedulerBacklogTimeout | 如果申请的Executors资源无法满足当前Tasks的需求,那么采用轮询的方式向集群申请Executors资源,该值为轮询的时间间隔,默认值与schedulerBacklogTimeout相同 |