雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

Spark on Yarn 为什么出现内存超界container被kill

Posted on 2018-09-13 | In Spark |

一个Executor对应一个JVM进程。 从Spark的角度看,Executor占用的内存分为两部分:

Memory)等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14

```spark.driver.memory``` 和```spark.executor.memory``` 分别设置Spark的Driver和Executor的```ExecutorMemory```.

```spark.yarn.executor.memoryOverhead```和```spark.yarn.driver.memoryOverhead```分别设置Spark的Driver和Executor的```MemoryOverhead```.

另外,Spark会大量分配堆外内存,堆外内存默认最大可以和```ExecutorMemory```一样,可以通过javaOptions使用```MaxDirectMemorySize```配置最大值。



堆外内存最大可以和```ExecutorMemory```一样,但是堆外内存又受```MemoryOverhead```限制,所以当```MaxDirectMemorySize```,```ExecutorMemory```和```MemoryOverhead```设置不合理时,会出现container内存超限,被Yarn kill的情况。

比如,```ExecutorMemory``` 为8G,```MemoryOverhead```为4G,```MaxDirectMemorySize```没有设置,此时yarn认为一个container最大可以使用12G内存,但是堆外内存最大可以使用8G,导致container最大可以使用超过16G内存(堆内内存+ 堆外内存),比12G大, 最终被Yarn kill掉。

合理的设置规则为: ```ExecutorMemory``` + ```MemoryOverhead``` > ```ExecutorMemory``` + ```MaxDirectMemorySize

所以,Spark应用占用集群内存的总大小为:

(executor个数) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+(SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)

参数调优建议:

每个Executor进程的内存设置4G~8G较为合适。

每个Executor的CPU core数量设置为2~4个较为合适。

以下是部分建议的参数设置:

1
2
3
4
5
6
7
8
9
10
11
12
--conf "spark.driver.extraJavaOptions=-XX:MaxDirectMemorySize=1024m -Xmn4g -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/tmp/log/run/gc-%t.log" \

--conf "spark.speculation=true" \
--conf "spark.speculation.quantile=0.95" \

--conf "spark.kryoserializer.buffer.max=1024m" \

--conf "spark.sql.hive.metastorePartitionPruning=true" \
--conf "spark.sql.optimizer.metadataOnly=true" \
--conf "spark.sql.parquet.filterPushdown=true" \
--conf "spark.sql.hive.caseSensitiveInferenceMode=NEVER_INFER" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \

redis系列:通过文章点赞排名案例学习sortedset命令

Posted on 2018-09-07 | In Redis |

redis系列:通过文章点赞排名案例学习sortedset命令

JVM源码分析之SystemGC完全解读

Posted on 2018-09-07 | In Java |

JVM源码分析之SystemGC完全解读

概述

JVM的GC一般情况下是JVM本身根据一定的条件触发的,不过我们还是可以做一些人为的触发,比如通过jvmti做强制GC,通过System.gc触发,还可以通过jmap来触发等,针对每个场景其实我们都可以写篇文章来做一个介绍,本文重点介绍下System.gc的原理

或许大家已经知道如下相关的知识

  • system.gc其实是做一次full gc
  • system.gc会暂停整个进程
  • system.gc一般情况下我们要禁掉,使用-XX:+DisableExplicitGC
  • system.gc在cms gc下我们通过-XX:+ExplicitGCInvokesConcurrent来做一次稍微高效点的GC(效果比Full GC要好些)
  • system.gc最常见的场景是RMI/NIO下的堆外内存分配等

如果你已经知道上面这些了其实也说明你对System.gc有过一定的了解,至少踩过一些坑,但是你是否更深层次地了解过它,比如

  • 为什么CMS GC下-XX:+ExplicitGCInvokesConcurrent这个参数加了之后会比真正的Full GC好?
  • 它如何做到暂停整个进程?
  • 堆外内存分配为什么有时候要配合System.gc?

如果你上面这些疑惑也都知道,那说明你很懂System.gc了,那么接下来的文字你可以不用看啦

JDK里的System.gc的实现

先贴段代码吧(java.lang.System)

发现主要调用的是Runtime里的gc方法(java.lang.Runtime)

这里看到gc方法是native的,在java层面只能到此结束了,代码只有这么多,要了解更多,可以看方法上面的注释,不过我们需要更深层次地来了解其实现,那还是准备好进入到jvm里去看看

Hotspot里System.gc的实现

如何找到native里的实现

上面提到了Runtime.gc是一个本地方法,那需要先在jvm里找到对应的实现,这里稍微提一下jvm里native方法最常见的也是最简单的查找,jdk里一般含有native方法的类,一般都会有一个对应的c文件,比如上面的java.lang.Runtime这个类,会有一个Runtime.c的文件和它对应,native方法的具体实现都在里面了,如果你有source,可能会猜到和下面的方法对应

其实没错的,就是这个方法,jvm要查找到这个native方法其实很简单的,看方法名可能也猜到规则了,Java_pkgName_className_methodName,其中pkgName里的”.“替换成”_“,这样就能找到了,当然规则不仅仅只有这么一个,还有其他的,这里不细说了,有机会写篇文章详细介绍下其中细节

DisableExplicitGC参数

上面的方法里是调用JVM_GC(),实现如下

看到这里我们已经解释其中一个疑惑了,就是DisableExplicitGC这个参数是在哪里生效的,起的什么作用,如果这个参数设置为true的话,那么将直接跳过下面的逻辑,我们通过-XX:+ DisableExplicitGC就是将这个属性设置为true,而这个属性默认情况下是true还是false呢

ExplicitGCInvokesConcurrent参数

这里主要针对CMSGC下来做分析,所以我们上面看到调用了heap的collect方法,我们找到对应的逻辑

collect里一开头就有个判断,如果should_do_concurrent_full_gc返回true,那会执行collect_mostly_concurrent做并行的回收

其中should_do_concurrent_full_gc中的逻辑是如果使用CMS GC,并且是system gc且ExplicitGCInvokesConcurrent==true,那就做并行full gc,当我们设置-XX:+ ExplicitGCInvokesConcurrent的时候,就意味着应该做并行Full GC了,不过要注意千万不要设置-XX:+DisableExplicitGC,不然走不到这个逻辑里来了

并行Full GC相对正常的Full GC效率高在哪里

stop the world

说到GC,这里要先提到VMThread,在jvm里有这么一个线程不断轮询它的队列,这个队列里主要是存一些VM_operation的动作,比如最常见的就是内存分配失败要求做GC操作的请求等,在对gc这些操作执行的时候会先将其他业务线程都进入到安全点,也就是这些线程从此不再执行任何字节码指令,只有当出了安全点的时候才让他们继续执行原来的指令,因此这其实就是我们说的stop the world(STW),整个进程相当于静止了

CMS GC

这里必须提到CMS GC,因为这是解释并行Full GC和正常Full GC的关键所在,CMS GC我们分为两种模式background和foreground,其中background顾名思义是在后台做的,也就是可以不影响正常的业务线程跑,触发条件比如说old的内存占比超过多少的时候就可能触发一次background式的cms gc,这个过程会经历CMS GC的所有阶段,该暂停的暂停,该并行的并行,效率相对来说还比较高,毕竟有和业务线程并行的gc阶段;而foreground则不然,它发生的场景比如业务线程请求分配内存,但是内存不够了,于是可能触发一次cms gc,这个过程就必须是要等内存分配到了线程才能继续往下面走的,因此整个过程必须是STW的,因此CMS GC整个过程都是暂停应用的,但是为了提高效率,它并不是每个阶段都会走的,只走其中一些阶段,这些省下来的阶段主要是并行阶段,Precleaning、AbortablePreclean,Resizing这几个阶段都不会经历,其中sweep阶段是同步的,但不管怎么说如果走了类似foreground的cms gc,那么整个过程业务线程都是不可用的,效率会影响挺大。CMS GC具体的过程后面再写文章详细说,其过程确实非常复杂的

正常的Full GC

正常的Full GC其实是整个gc过程包括ygc和cms gc(这里说的是真正意义上的Full GC,还有些场景虽然调用Full GC的接口,但是并不会都做,有些时候只做ygc,有些时候只做cms gc)都是由VMThread来执行的,因此整个时间是ygc+cms gc的时间之和,其中CMS GC是上面提到的foreground式的,因此整个过程会比较长,也是我们要避免的

并行的Full GC

并行Full GC也通样会做YGC和CMS GC,但是效率高就搞在CMS GC是走的background的,整个暂停的过程主要是YGC+CMS_initMark+CMS_remark几个阶段

堆外内存常配合使用System GC

这里说的堆外内存主要针对java.nio.DirectByteBuffer,这些对象的创建过程会通过Unsafe接口直接通过os::malloc来分配内存,然后将内存的起始地址和大小存到java.nio.DirectByteBuffer对象里,这样就可以直接操作这些内存。这些内存只有在DirectByteBuffer回收掉之后才有机会被回收,因此如果这些对象大部分都移到了old,但是一直没有触发CMS GC或者Full GC,那么悲剧将会发生,因为你的物理内存被他们耗尽了,因此为了避免这种悲剧的发生,通过-XX:MaxDirectMemorySize来指定最大的堆外内存大小,当使用达到了阈值的时候将调用System.gc来做一次full gc,以此来回收掉没有被使用的堆外内存,具体堆外内存是如何回收的,其原理机制又是怎样的,还是后面写篇详细的文章吧

Minor GC、Major GC和Full GC之间的区别

Posted on 2018-09-07 | In Java |

Minor GC、Major GC和Full GC之间的区别

JMC和JCMD使用

Posted on 2018-08-23 | In Java |

使用JCMD排查问题

另一份Java应用调优指南之-前菜

另一份Java应用调优指南之-工具篇

Java Mission Control之使用

1
2
3
4
5
6
7
8
9
-Dcom.sun.management.jmxremote=true 
-Djava.rmi.server.hostname=**
-Dcom.sun.management.jmxremote.port=6666
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.managementote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
# 下面是 Java Flight Recorder 取样分析
-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder

Javacpu 和内存问题排查步骤:

  1. ps ux 查看运行的进程
  2. top -c查看占用cpu的进程
  3. top -bn1 -H -p / top -Hp 查看占用cpu的线程 // 找出cpu高的线程tid ps -mp -o THREAD,tid,time | sort -rn
  4. jstack 查看线程运行情况 // 转换线程tidprintf “%x\n”
  5. jmap -heap 查看内存占用情况

JDK的bin下的工具有哪些功能

Posted on 2018-08-23 | In Java |

Java生产环境下问题排查

JVM源码分析之Jstat工具原理完全解读

Java内存泄漏分析系列之一:使用jstack定位线程堆栈信息

Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析

Java内存泄漏分析系列之三:jstat命令的使用及VM Thread分析

Java内存泄漏分析系列之四:jstack生成的Thread Dump日志线程状态

Java内存泄漏分析系列之五:常见的Thread Dump日志案例分析

Java内存泄漏分析系列之六:JVM Heap Dump(堆转储文件)的生成和MAT的使用

查看JVM参数

jps -l 查看所有正在运行的Java程序,同时显示启动类类名,获取到PID

1
2
4706 org.apache.catalina.startup.Bootstrap
5023 sun.tools.jps.Jps

jinfo -flags PID 查看运行时进程参数与JVM参数

1
2
3
4
5
6
Attaching to process ID 28987, please wait...
Debugger attached successfully.
Server compiler detected.
JVM version is 25.171-b11
Non-default VM flags: -XX:CICompilerCount=3 -XX:InitialHeapSize=132120576 -XX:MaxHeapSize=2092957696 -XX:MaxNewSize=697303040 -XX:MinHeapDeltaBytes=524288 -XX:NewSize=44040192 -XX:OldSize=88080384 -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -XX:+UseParallelGC
Command line: -Dspring.config.location=application.properties -Dspring.profiles.active=staging

java -XX:+PrintFlagsFinal -version 查看当前虚拟机默认JVM参数

查看即时GC状态

jstat -gc PID 1000 10 每秒查看一次gc信息,共10次

输出比较多的参数,每个字段的解释参看 https://docs.oracle.com/javase/8/docs/technotes/tools/unix/jstat.html

1
2
S0C    S1C    S0U    S1U      EC       EU        OC         OU       MC     MU    CCSC   CCSU   YGC     YGCT    FGC    FGCT     GCT   
512.0 512.0 15.3 0.0 4416.0 1055.2 11372.0 7572.5 14720.0 14322.5 1664.0 1522.8 40 0.137 8 0.039 0.176

期间可能碰到提示sun.jvm.hotspot.runtime.VMVersionMismatchException: Supported versions are 24.181-b01. Target VM is 25.171-b11的问题,原因在于安装了多个版本,使用which、ls -l可简介定位到与当前执行Java程序相同的Java版本

内存问题

内存泄露导致OOM?内存占用异常的高?这是生产环境常常出现的问题,Java提供dump文件供我们对内存里发生过的事情进行了记录,我们需要借助一些工具从中获取有价值的信息。

导出Dump文件

  1. 提前对Java程序加上这些参数印dump文件 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=./
  2. 对正在运行的程序使用jmap:jmap -dump:format=b,file=heap.hprof PID

分析Dump文件

如果Dump文件不太大的话,可以传到 http://heaphero.io/index.jsp 来分析

文件比较大,且想进行更加系统的分析,推荐使用MAT分析,有如下几种常用查看方式

  1. 首页中的【Leak Suspects】能推测出问题所在
  2. 点击【Create a histogram from an arbitrary set of objects】查到所有对象的数量
  3. 右键点击某个对象【Merge Shortest Paths to GC Roots】-> 【exclude all phantom/weak/soft etc. references】能查询到大量数量的某个对象是哪个GC ROOT引用的

线程问题

任务长时间不退出?CPU 负载过高?很可能因为死循环或者死锁,导致某些线程一直执行不被中断,但是不报错是最烦人的,所以日志里看不到错误信息,并且又不能用dump文件分析,因为跟内存无关。这个时候就需要用线程分析工具来帮我们了。

导出jstack文件

使用jstack PID > 文件,如果失败请加-F参数,如果还失败请使用Java程序启动时使用的用户执行jstack

排查步骤

  1. top 查看到哪个java程序负载高
  2. top -p PID -H 查看该进程所有进程的运行状态
  3. 记录下高负载的线程ID,printf "&x" PID转换成16进制
  4. jstack PID > 文件
  5. 在jstack文件中用转换成16进制之后的线程ID查询线程运行堆栈
  6. 从堆栈中了解到线程在执行什么任务,并结合业务与代码判断问题所在

以下转载自: http://www.codingwhy.com/view/858.html

Java开发人员肯定都知道JDK的bin目录中有“java”、“javac”这两个命令行工具,但并非所有的程序员都了解过JDK的bin目录之中的其他工具的作用。

这些工具被Sun公司作为“礼物”附赠给JDK的使用者,并在软件的使用说明中把他们申明为“没有技术支持并且是试验性质的(unsupported and experimental)”的产品,但事实上,这些工具都非常的稳定且功能强大,能在处理应用程序性能问题、定位故障时发挥很大的作用。

细心的可能会发现,这些工具都非常小,是因为这些工具大多是jdk/lib/tools.jar类库的一层包装而已,他们主要的功能代码是在tools类库中实现的。

Java故障检修,程序概要分析,监视和管理工具

工具名称 用途
jvisualvm 一个图形化的Java虚拟机 参考: Java jvisualvm简要说明
jconsole java监视台和管理控制台 参考: 如何利用 JConsole观察分析Java程序的运行,进行排错调优 https://www.ibm.com/support/knowledgecenter/zh/SSYKE2_8.0.0/com.ibm.java.vm.80.doc/docs/jconsole.html
jps JVM Process Status进程状态工具。列出目标系统的HotSpot JJVM 可以列出本机所有java进程的pid
jstat 按照命令行的具体要求记录和收集一个JVM的性能数据
jstatd JVM jstat 的守护进程
jmc Java任务控制工具(Java Mission Control),主要用于HotSpot JVM的生产时间监测、分析、诊断。

故障检测和修理工具

工具名称 用途
jinfo 配置或打印某个Java进程VM flag
jhat 堆储存查看器
jmap Java内存图
jsadebugd Java的 Serviceability Agent Debug的守护进程
jstack Java堆栈跟踪

JVM性能调优监控工具jps、jstack、jmap、jhat、jstat、hprof使用详解

基本工具

这些工具是JDK的基础,用这些工具来编写应用程序。

工具名称 用途
javac Java语言编译器
java Java应用程序启动器
javaw Java运行工具,用于运行.class字节码文件或.jar文件,但不会显示控制台输出信息,适用于运行图形化程序。
javadoc Java API文档生成器
apt java 注解处理器 「深入Java」Annotation注解
appletviewer java applet小程序查看器
jar java文件压缩打包工具
jdb Java调试器
javah C头文件和stub生成器,用于写本地化方法,例如生产JNI样式的头文件
javap class文件反编译工具
extcheck 用于检测jar包中的问题
jcmd Java命令行(Java Command),用于向正在运行的JVM发送诊断命令请求。

安全工具

这些工具用于设置系统的安全规则和生产可以工作在远端的安全规则下的应用程序

工具名称 用途
keytool 管理密钥库和证书
jarsigner 生产和校验JAR签名
policytool 有用户界面的规则管理工具
kinit 用于获得和缓存网络认证协议Kerberos 票证的授予票证
klist 凭据高速缓存和密钥表中的 Kerberos 显示条目
ktab 密钥和证书管理工具

Java国际化工具

这些工具可以帮助你创建可本地化的应用程序

native2ascii

远程方法调用工具

这些工具可以帮助创建可以和web和网络交互的应用程序

工具名称 用途
rmic 生成远程对象的stubs and skeletons(存根和框架)
rmid Java远程方法调用(RMI:Remote Method Invocation)活化系统守护进程
rmiregistry Java远程对象注册表
serialver 返回类的 serialVersionUID
java-rmi Java远程方法调用(Java Remote Method Invocation)工具,主要用于在客户机上调用远程服务器上的对象

Java IDL and RMI-IIOP 工具

这些工具用于创建使用OMG-Standard IDL 和 CORBA/IIOP 的应用程序

工具名称 用途
tnameserv Java IDL瞬时命名服
idlj 生产映射到OMG IDL接口可以使Java应用程序使用CORBA的.java文件
orbd 为客户可以在CORBA环境下透明的定位和调用服务器的稳定的对象提供支持
servertool 为应用程序提供易于使用的接口用于注册,注销,启动,关闭服务器

Java部署工具

工具名称 用途
pack200 使用java gzip压缩工具将JAR文件转换为压缩的pack200文件,生产打包文件是高度压缩的JAR包,可以直接部署,减少下载时间
unpack200 解包pack200文件为JARs

Java web工具

工具名称 用途
javaws Java web 启动命令行工具
schemagen Java构架的XML Schema生成器
wsgen 生成 JAX-WS
wsimport 生成 JAX-WS
xjc 绑定编译器

Java脚本工具

工具名称 用途
jrunscript 运行脚本

其他工具

工具名称 用途
jabswitch Java Access Bridge Switch的简称,用于控制Java访问桥的开/关。Java访问桥是一种技术,让Java应用程序实现Accessibility API,以供Microsoft Windows系统的辅助技术访问。
javafxpackager JavaFX打包工具

实时流计算+SparkStreaming+Kafka+Redis+Exactly-once+实时去重

Posted on 2018-02-27 | In Spark |

http://lxw1234.com/archives/2018/02/901.htm

Spark遇到中文乱码问题如何解决

Posted on 2018-01-16 | In Spark |

Spark 遇到中文乱码问题如何解决?

  • 问题原因:
    一般是spark平台机器环境编码设置问题

  • 解决方法:

  1. 在spark-default中为executor加上-Dfile.encoding=UTF-8
    spark/conf/spark-defaults.conf

    1
    spark.executor.extraJavaOptions -XX:+UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=4 -XX:NewRatio=3 -XX:SurvivorRatio=3 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -XX:+PrintGCApplicationStoppedTime -Dfile.encoding=UTF-8
  2. 在所有涉及到字节转换时,一定要指定编码方式

    1
    2
    3
    4
    5
    String -> Byte: 
    string.getBytes("UTF-8")

    Byte -> String:
    new String(bytes, "UTF-8")

Spark动态资源分配

Posted on 2018-01-16 | In Spark |

1. Yarn模式下资源分配

在Yarn模式下,ResourceManager(以后简称RM)会为每一个应用程序分配固定数量的Executors(默认是2个,可通过–num-executors设置),且为每个Executor分配固定的CPU和内存(默认1个CPU和512M内存,可通过-executor-memory and和–executor-cores设置 )不同应用程序的Executors之间是互相独立的,每个应用程序的Executors只负责自己的应用程序的任务运行和数据存储。每个Executor是一个独立的JVM进程,JVM之间内存是无法共享的。

2. 动态资源分配的简介

动态资源调度是在Spark1.2之后才开始加进去的,

①原理:当一个应用程序不需要使用资源,且后续没有资源申请的时候,它就将现有的资源交回给集群,这样集群便可以将资源分配给其他应用程序来使用。当该应用程序再次需要资源的时候就再去向集群申请。

②优点:当集群中有很多应用程序的时候可以提高集群的资源利用率,比如集群中有一个应用程序作为服务一直在执行,集群给他分配了12个CPU和8G内存, 默认情况下,该应用会一直占用12个CPU和8G内存的资源,直到程序结束,如果目前该应用程序运行不需要这么多的资源,资源处于空闲状态的时候,集群也 不会把该资源分配给其他的应用程序,从而造成的资源的浪费。所以使用动态资源分配可以提高资源利用率。

③缺点:动态资源分配的调度算法如果不合理可能造成应用程序的高延迟,一种情况是某个应用程序刚把资源交回给集群,下一刻就又需要使用资源,然后在去向集群申请。

3. 动态资源分配的配置

①修改配置文件spark-defaults.conf,添加如下内容

1
2
3
4
5
6
7
8
9
10
11
12
spark.dynamicAllocation.enabled   true   //开启动态资源分配
spark.shuffle.service.enabled true //启用External shuffle Service服务
//shuffle.service是必须要启动的


spark.dynamicAllocation.minExecutors 1 //每个Application最小分配的executor数
spark.dynamicAllocation.maxExecutors 30 //每个Application最大并发分配的executor数

spark.shuffle.service.port 7337 //Shuffle Service服务端口,必须和yarn-site中的一致

spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s

②在$SPARK_HOME/lib目录下找到spark--yarn-shuffle.jar文件,将该jar文件添加到NodeManager节点的classpath环境中。

③修改yarn-site.xml文件,设置如下参数

1
2
3
4
5
6
7
8
9
10
11
12
<?xml version="1.0"?>
<configuration>
...........
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
</configuration>

④重新启动NodeManager。保证每个NodeManager节点读能找到spark-1.3.0-yarn-shuffle.jar该文件,否则NodeManager无法启动。我就是因为这个原因,导致花了很长时间才调试通。

4. 资源分配策略

资源的动态分配要求Spark在资源空闲的时候放弃Executors资源,在需要运行任务的时候申请Executors资源。但是很难预测一个即将放弃 Executors资源的应用程序下一刻是否会运行任务,或者一个刚申请的Executor在下一刻是否会处于空闲状态。

①请求策略

资源动态分配模式下,如果某个Spark应用程序有某些tasks处于等待运行状态时,该应用程序就会向集群申请更多的Executors资源,这意味着 集群为该应用程序分配的Executors资源不足以满足它提交的正在运行的,且还没有运行结束的tasks。Spark请求Executors采用的是 轮询的方式,当有tasks处于等待状态spark.dynamicAllocation.schedulerBacklogTimeout秒后会触发一次Executors请求,如果之后tasks一直处于等待状态,那么每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout秒会触发一次Executors请求,每次轮询中申请的Executors个数以指数形式增长,比如第一个轮询申请1个Executor,之后会依次申请2,4,8…个Executors。

需要关注两点,一是谨慎申请Executors资源,以免申请到的Executors资源只有一小部分处于运行状态,其他的大部分处于空闲状态。二是充分利用申请到的Executors资源,以防止后续有大量Executors资源申请的情况出现。

②删除策略

当某个Spark应用程序的某个Executor处于空闲状态spark.dynamicAllocation.executorIdleTimeout秒后就会删除该Executor,注意,如果有task处于等待状态的时候,该Executor就不会处于空闲状态。

5. 如何有效的删除Executor

在默认情况下,只有在Executor失败或者它所属的应用程序退出的情况下Executor才会结束,这两种情况下Executor的状态都不需要保 存,可以直接删除。在动态资源分配情况下,在Executor退出之前应用程序还在执行,所以在Executor的被删除之前需要将它的状态需要保存下 来,等下次执行Executor的时候可以重用。对于Shuffle保存Executor的状态尤其重要,在Shuffle阶段Executor会将 Map阶段的输出结果保存到本地文件系统中,然后在Reduce阶段其他的Executors会从该文件系统中读取Map阶段的输出结果。动态资源分配模 式下,一个Executor有可能在它的Shuffle还没有完成之前就将该Executor删除了,这就导致Shuffle阶段Executor保存的 文件在下次重新启动Executor的时候需要进行不必要的重新计算。

解决方法是使用额外的Shuffle Service,这个特性是在Spark1.2之后加进来的,Shuffle Service是运行在集群每个节点上的服务,它与Spark应用程序及其Executors是相互独立的,如果Shuffle Service在运行,Spark的Executors就会从Shuffle Service获取Shuffle阶段保存的文件,而不是去每个节点获取。这就意味着一个Executors被删除之后,它在Shuffle阶段的状态信 息还一直运行在Shuffle Service中。

6. 动态资源分配相关配置

红色参数比较重要,在使用动态资源分配的时候最好自行设置。

属性名 默认值 说明
spark.dynamicAllocation.enabled false 是否启动动态资源分配,默认不启动,目前只支持Yarn模式,Standalone和Mesoso暂不支持
spark.dynamicAllocation.executorIdleTimeout 600 如果某个Executor在该时间段内处于空闲状态,那么它将被删除,其所占用的资源将返还给集群
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 动态资源分配启动时初始化的Executor的个数,默认情况下与spark.dynamicAllocation.minExecutors配置的值相同,如果启动动态资源分配,那么该值必须大于0,否则应用程序会因为没有资源而无法运行
spark.dynamicAllocation.maxExecutors Integer.MAX_VALUE 动态资源分配的Executors的最大个数
spark.dynamicAllocation.minExecutors 0 分配的最小Executors个数,默认为0
spark.dynamicAllocation.schedulerBacklogTimeout 5 如果有tasks处于等待状态,多少秒后开始向集群申请Executors资源,默认是5秒
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 如果申请的Executors资源无法满足当前Tasks的需求,那么采用轮询的方式向集群申请Executors资源,该值为轮询的时间间隔,默认值与schedulerBacklogTimeout相同

Spark-on-Yarn资源调度和作业调度

Posted on 2018-01-16 | In Spark |

作业调度

Spark默认采取FIFO策略运行多个Jobs,它提供一个队列来保存已经提交的Jobs,如果队头的Job不需要占用所有的集群资源,那么后续的 Job可以立即运行,但是如果队头的Job需要占用所有的集群资源,且运行时间很长,那么即使后续的Job很小,也要等待前面的Job执行完后才可以执 行,这样就造成了大量的延迟。

Spark0.8+版本开始支持公平调度策略,在该策略下,Spark以round robin的方式为每个Job分配执行的Tasks,每个Job在同一时间都可以运行多个Tasks,Jobs之间是共享集群资源的。这就意味着当一个大 任务运行的同时,用户提交的小任务可以立即执行,且获得良好的响应时间,而不用去等待大任务的结束。

通过

1
2
3
4
``` scala
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

1. 公平调度池

Spark支持将不同的Jobs划分到不同的调度池中,可以为每个调度池设置不同的属性(比如权重),这样就可以将重要的Job分到高优先级的调度池中, 比如可以为每个用户创建一个调度池,并为每个用户划分相等的集群资源,而不用去管每个用户同时运行多少个Job。

如果用户没有设置公平调度池,默认情况下提交的任务被划分到default调度池中,可通过

1
2
``` scala
sc.setLocalProperty("spark.scheduler.pool", "pool1")

清除调度池

1
sc.setLocalProperty("spark.scheduler.pool", null)

2. 默认调度池

默认情况下不同的调度池共享集群的资源,但在每个调度池内部,Job是以FIFO的方式运行的,比如为每个用户创建一个调度池,集群的资源平均分配给各个调度池,在调度池内部,每个Job按照FIFO的顺序运行。

3. 调度池的属性

schedulingMode:调度池内部Job之间的调度顺序,FIFO或FAIR,默认是FIFO
weight:调度池的权重,默认是1,如果设置为2,那么它将获取相当于其他调度池2倍的资源。
minShare:最少满足资源数,默认是0,只有在满足每个调度池minShare资源数的基础之上,才会按照weight比例来获取额外的资源。

可创建一个类似于

1
2
3
``` scala
conf.set("spark.scheduler.allocation.file", "/path/to/file")
fairscheduler.xml.template文件内容

fairscheduler.xml.template文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>

4. 测试

myFairScheduler.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="onlinetestpool">
<schedulingMode>FAIR</schedulingMode>
<weight>2</weight>
<minShare>7</minShare>
</pool>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>7</minShare>
</pool>
</allocations>

启动Spark应用
spark-shell \
–conf spark.scheduler.mode=FAIR \
–conf spark.scheduler.pool=onlinetestpool \
–conf spark.scheduler.allocation.file=/home/spark/spark/conf/fairscheduler.xml

12…11

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4