分享

spark core源码分析

xuanxufeng 2015-11-29 20:10:59 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 13363
问题导读
1.spark.shuffle.consolidateFiles是为了解决什么问题?
2.spark.shuffle.spill的作用是什么?
3.spark.local.dir默认是什么?





spark 参数详解

一、Shuffle 相关
1、spark.shuffle.manager(默认 sort)

HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中。带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件。如果文件数量特别巨大,对文件读写的性能会带来比较大的影响,此外由于同时打开的文件句柄数量众多,序列化,以及压缩等操作需要分配的临时内存空间也可能会迅速膨胀到无法接受的地步,对内存的使用和GC带来很大的压力,在Executor内存比较小的情况下尤为突出,例如Spark on Yarn模式。

SortShuffleManager(目前是默认值),在写入分区数据的时候,首先会根据实际情况对数据采用不同的方式进行排序操作,底线是至少按照Reduce分区Partition进行排序,这样来至于同一个Map任务Shuffle到不同的Reduce分区中去的所有数据都可以写入到同一个外部磁盘文件中去,用简单的Offset标志不同Reduce分区的数据在这个文件中的偏移量。这样一个Map任务就只需要生成一个shuffle文件,从而避免了上述HashShuffleManager可能遇到的文件数量巨大的问题

两者的性能比较,取决于内存,排序,文件操作等因素的综合影响。

对于不需要进行排序的Shuffle操作来说,如repartition等,如果文件数量不是特别巨大,HashShuffleManager面临的内存问题不大,而SortShuffleManager需要额外的根据Partition进行排序,显然HashShuffleManager的效率会更高。

而对于本来就需要在Map端进行排序的Shuffle操作来说,如ReduceByKey等,使用HashShuffleManager虽然在写数据时不排序,但在其它的步骤中仍然需要排序,而SortShuffleManager则可以将写数据和排序两个工作合并在一起执行,因此即使不考虑HashShuffleManager的内存使用问题,SortShuffleManager依旧可能更快。

2、spark.shuffle.sort.bypassMergeThreshold
这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。 这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。

3、spark.shuffle.consolidateFiles(默认 false)
这个配置参数仅适用于HashShuffleMananger的实现,同样是为了解决生成过多文件的问题,采用的方式是在不同批次运行的Map任务之间重用Shuffle输出文件,也就是说合并的是不同批次的Map任务的输出数据,但是每个Map任务所需要的文件还是取决于Reduce分区的数量,因此,它并不减少同时打开的输出文件的数量,因此对内存使用量的减少并没有帮助。只是HashShuffleManager里的一个折中的解决方案。
需要注意的是,这部分的代码实现尽管原理上说很简单,但是涉及到底层具体的文件系统的实现和限制等因素,例如在并发访问等方面,需要处理的细节很多,因此一直存在着这样那样的bug或者问题,导致在例如EXT3上使用时,特定情况下性能反而可能下降,因此从Spark 0.8的代码开始,一直到Spark 1.1的代码为止也还没有被标志为Stable,不是默认采用的方式。此外因为并不减少同时打开的输出文件的数量,因此对性能具体能带来多大的改善也取决于具体的文件数量的情况。所以即使你面临着Shuffle文件数量巨大的问题,这个配置参数是否使用,在什么版本中可以使用,也最好还是实际测试以后再决定。
如果为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来说,合并文件可 以提高文件系统性能,如果使用的是ext4 或 xfs 文件系统,建议设置为true;对于ext3,由于文件系统的限制,设置为true反而会使内核>8的机器降低性能

4、spark.shuffle.spill(默认 true)shuffle的过程中,如果涉及到排序,聚合等操作,势必会需要在内存中维护一些数据结构,进而占用额外的内存。如果内存不够用怎么办,那只有两条路可以走,一就是out of memory 出错了,二就是将部分数据临时写到外部存储设备中去,最后再合并到最终的Shuffle输出文件中去。
这里spark.shuffle.spill 决定是否Spill到外部存储设备(默认打开),如果你的内存足够使用,或者数据集足够小,当然也就不需要Spill,毕竟Spill带来了额外的磁盘操作。
如果为true,在shuffle期间通过溢出数据到磁盘来降低了内存使用总量,溢出阈值是由spark.shuffle.memoryFraction指定的

5、spark.shuffle.memoryFraction / spark.shuffle.safetyFraction
在启用Spill的情况下,spark.shuffle.memoryFraction(1.1后默认为0.2)决定了当Shuffle过程中使用的内存达到总内存多少比例的时候开始Spill。

通过spark.shuffle.memoryFraction可以调整Spill的触发条件,即Shuffle占用内存的大小,进而调整Spill的频率和GC的行为。总的来说,如果Spill太过频繁,可以适当增加spark.shuffle.memoryFraction的大小,增加用于Shuffle的内存,减少Spill的次数。当然这样一来为了避免内存溢出,对应的可能需要减少RDD cache占用的内存,即减小spark.storage.memoryFraction的值,这样RDD cache的容量减少,有可能带来性能影响,因此需要综合考虑。
由于Shuffle数据的大小是估算出来的,一来为了降低开销,并不是每增加一个数据项都完整的估算一次,二来估算也会有误差,所以实际暂用的内存可能比估算值要大,这里spark.shuffle.safetyFraction(默认为0.8)用来作为一个保险系数,降低实际Shuffle使用的内存阀值,增加一定的缓冲,降低实际内存占用超过用户配置值的概率。

6、spark.shuffle.spill.compress / spark.shuffle.compress(true)
这两个配置参数都是用来设置Shuffle过程中是否使用压缩算法对Shuffle数据进行压缩,前者针对Spill的中间数据,后者针对最终的shuffle输出文件,默认都是True

理论上说,spark.shuffle.compress设置为True通常都是合理的,因为如果使用千兆以下的网卡,网络带宽往往最容易成为瓶颈。此外,目前的Spark任务调度实现中,以Shuffle划分Stage,下一个Stage的任务是要等待上一个Stage的任务全部完成以后才能开始执行,所以shuffle数据的传输和CPU计算任务之间通常不会重叠,这样Shuffle数据传输量的大小和所需的时间就直接影响到了整个任务的完成速度。但是压缩也是要消耗大量的CPU资源的,所以打开压缩选项会增加Map任务的执行时间,因此如果在CPU负载的影响远大于磁盘和网络带宽的影响的场合下,也可能将spark.shuffle.compress 设置为False才是最佳的方案.如果压缩将使用spark.io.compression.codec。
对于spark.shuffle.spill.compress而言,情况类似,但是spill数据不会被发送到网络中,仅仅是临时写入本地磁盘,而且在一个任务中同时需要执行压缩和解压缩两个步骤,所以对CPU负载的影响会更大一些,而磁盘带宽(如果标配12HDD的话)可能往往不会成为Spark应用的主要问题,所以这个参数相对而言,或许更有机会需要设置为False。如果压缩将使用spark.io.compression.codec。
总之,Shuffle过程中数据是否应该压缩,取决于CPU/DISK/NETWORK的实际能力和负载,应该综合考虑。

7、spark.shuffle.file.buffer.kb(默认 100)
每个shuffle的文件输出流内存缓冲区的大小,以KB为单位。这些缓冲区可以减少磁盘寻道的次数,也减少创建shuffle中间文件时的系统调用
8、spark.reducer.maxMbInFlight(默认 48)
每个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。由于每个map输出都需要一个缓冲区来接收它,这代表着每个 reduce 任务有固定的内存开销,所以要设置小点,除非有很大内存


二、storage 相关
1、spark.local.dir(默认 /tmp)
这个看起来很简单,就是Spark用于写中间数据,如RDD Cache,Shuffle,Spill等数据的位置,那么有什么可以注意的?
首先,最基本的当然是我们可以配置多个路径(用逗号分隔)到多个磁盘上增加整体IO带宽,这个大家都知道
其次,目前的实现中,Spark是通过对文件名采用hash算法分布到多个路径下的目录中去,如果你的存储设备有快有慢,比如SSD+HDD混合使用,那么你可以通过在SSD上配置更多的目录路径来增大它被Spark使用的比例,从而更好地利用SSD的IO带宽能力。当然这只是一种变通的方法,终极解决方案还是应该像目前HDFS的实现方向一样,让Spark能够感知具体的存储设备类型,针对性的使用
用于保存map输出文件或者转储RDD。在Spark 1.0 及更高版本此属性会被环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替

2、spark.executor.memory(默认 512 m)
每个executor使用的内存大小,和性能本身当然并没有直接的关系,但是几乎所有运行时性能相关的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值
理论上Executor 内存当然是多多益善,但是实际受机器配置,以及运行环境,资源共享,JVM GC效率等因素的影响,还是有可能需要为它设置一个合理的大小。 多大算合理,要看实际情况Executor的内存基本上是Executor内部所有任务共享的,而每个Executor上可以支持的任务的数量取决于Executor所管理的CPU Core资源的多少,因此你需要了解每个任务的数据规模的大小,从而推算出每个Executor大致需要多少内存即可满足基本的需求。
如何知道每个任务所需内存的大小呢,这个很难统一的衡量,因为除了数据集本身的开销,还包括算法所需各种临时内存空间的使用,而根据具体的代码算法等不同,临时内存空间的开销也不同。但是数据集本身的大小,对最终所需内存的大小还是有一定的参考意义的。
通常来说每个分区的数据集在内存中的大小,可能是其在磁盘上源数据大小的若干倍(不考虑源数据压缩,Java对象相对于原始裸数据也还要算上用于管理数据的数据结构的额外开销),需要准确的知道大小的话,可以将RDD cache在内存中,从BlockManager的Log输出可以看到每个Cache分区的大小(其实也是估算出来的,并不完全准确)
如: BlockManagerInfo: Added rdd_0_1 on disk on sr438:41134 (size: 495.3 MB)
反过来说,如果你的Executor的数量和内存大小受机器物理配置影响相对固定,那么你就需要合理规划每个分区任务的数据规模,例如采用更多的分区,用增加任务数量(进而需要更多的批次来运算所有的任务)的方式来减小每个任务所需处理的数据大小。

3、spark.storage.memoryFraction(默认 0.6)
指Java堆用于cache的比例
如前面所说spark.executor.memory决定了每个Executor可用内存的大小,而spark.storage.memoryFraction则决定了在这部分内存中有多少可以用于Memory Store管理RDD Cache数据,剩下的内存用来保证任务运行时各种其它内存空间的需要。

spark.executor.memory默认值为0.6,官方文档建议这个比值不要超过JVM Old Gen区域的比值。这也很容易理解,因为RDD Cache数据通常都是长期驻留内存的,理论上也就是说最终会被转移到Old Gen区域(如果该RDD还没有被删除的话),如果这部分数据允许的尺寸太大,势必把Old Gen区域占满,造成频繁的FULL GC。
如何调整这个比值,取决于你的应用对数据的使用模式和数据的规模,粗略的来说,如果频繁发生Full GC,可以考虑降低这个比值,这样RDD Cache可用的内存空间减少(剩下的部分Cache数据就需要通过Disk Store写到磁盘上了),会带来一定的性能损失,但是腾出更多的内存空间用于执行任务,减少Full GC发生的次数,反而可能改善程序运行的整体性能

4、spark.streaming.blockInterval(默认 200)
这个参数用来设置Spark Streaming里Stream Receiver生成Block的时间间隔,默认为200ms。具体的行为表现是具体的Receiver所接收的数据,每隔这里设定的时间间隔,就从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。理论上来说,为了每个Streaming Batch 间隔里的数据是均匀的,这个时间间隔当然应该能被Batch的间隔时间长度所整除。总体来说,如果内存大小够用,Streaming的数据来得及处理,这个blockInterval时间间隔的影响不大,当然,如果数据Cache Level是Memory+Ser,即做了序列化处理,那么BlockInterval的大小会影响序列化后数据块的大小,对于Java 的GC的行为会有一些影响。

此外spark.streaming.blockQueueSize决定了在StreamBlock被存储到BlockMananger之前,队列中最多可以容纳多少个StreamBlock。默认为10,因为这个队列Poll的时间间隔是100ms,所以如果CPU不是特别繁忙的话,基本上应该没有问题。

5、spark.storage.memoryMapThreshold(默认 2m)
以字节为单位的块大小,用于磁盘读取一个块大小时进行内存映射。这可以防止Spark在内存映射时使用很小块,一般情况下,对块进行内存映射的开销接近或低于操作系统的页大小

三、压缩及序列化相关
1、spark.serializer(默认JavaSerializer)

默认为org.apache.spark.serializer.JavaSerializer,官方建议使用org.apache.spark.serializer.KryoSerializer,当然也可以任意是定义为org.apache.spark.Serializer子类的序化器,实际上只要是org.apache.spark.serializer的子类就可以了,不过如果只是应用,大概你不会自己去实现一个的。
序列化对于spark应用的性能来说,还是有很大影响的,在特定的数据格式的情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,当然放到整个Spark程序中来考量,比重就没有那么大了,但是以Wordcount为例,通常也很容易达到30%以上的性能提升。而对于一些Int之类的基本类型数据,性能的提升就几乎可以忽略了。KryoSerializer依赖Twitter的Chill库来实现,相对于JavaSerializer,主要的问题在于不是所有的Java Serializable对象都能支持。
需要注意的是,这里可配的Serializer针对的对象是Shuffle数据,以及RDD Cache等场合,而Spark Task的序列化是通过spark.closure.serializer来配置,但是目前只支持JavaSerializer,所以等于没法配置啦。

2、spark.rdd.compress(默认 false)
这个参数决定了RDD Cache的过程中,RDD数据在序列化之后是否进一步进行压缩再储存到内存或磁盘上。当然是为了进一步减小Cache数据的尺寸,对于Cache在磁盘上而言,绝对大小大概没有太大关系,主要是考虑Disk的IO带宽。而对于Cache在内存中,那主要就是考虑尺寸的影响,是否能够Cache更多的数据,是否能减小Cache数据对GC造成的压力等。

这两者,前者通常不会是主要问题,尤其是在RDD Cache本身的目的就是追求速度,减少重算步骤,用IO换CPU的情况下。而后者,GC问题当然是需要考量的,数据量小,占用空间少,GC的问题大概会减轻,但是是否真的需要走到RDD Cache压缩这一步,或许用其它方式来解决可能更加有效。
所以这个值默认是关闭的,但是如果在磁盘IO的确成为问题或者GC问题真的没有其它更好的解决办法的时候,可以考虑启用RDD压缩。

3、spark.broadcast.compress(默认 true)
是否在发送之前压缩广播变量,默认值为True。

Broadcast机制是用来减少运行每个Task时,所需要发送给TASK的RDD所使用到的相关数据的尺寸,一个Executor只需要在第一个Task启动时,获得一份Broadcast数据,之后的Task都从本地的BlockManager中获取相关数据。在1.1最新版本的代码中,RDD本身也改为以Broadcast的形式发送给Executor(之前的实现RDD本身是随每个任务发送的),因此基本上不太需要显式的决定哪些数据需要broadcast了。
因为Broadcast的数据需要通过网络发送,而在Executor端又需要存储在本地BlockMananger中,加上最新的实现,默认RDD通过Boradcast机制发送,因此大大增加了Broadcast变量的比重,所以通过压缩减小尺寸,来减少网络传输开销和内存占用,通常都是有利于提高整体性能的。
什么情况可能不压缩更好呢,大致上个人觉得同样还是在网络带宽和内存不是问题的时候,如果Driver端CPU资源很成问题(毕竟压缩的动作基本都在Driver端执行),那或许有调整的必要。

4、spark.io.compression.codec(默认LZFCompressionCodec)
RDD Cache和Shuffle数据压缩所采用的算法Codec,默认值曾经是使用LZF作为默认Codec,最近因为LZF的内存开销的问题,默认的Codec已经改为Snappy。

LZF和Snappy相比较,前者压缩率比较高(当然要看具体数据内容了,通常要高20%左右),但是除了内存问题以外,CPU代价也大一些(大概也差20%~50%?)
在用于Shuffle数据的场合下,内存方面,应该主要是在使用HashShuffleManager的时候有可能成为问题,因为如果Reduce分区数量巨大,需要同时打开大量的压缩数据流用于写文件,进而在Codec方面需要大量的buffer。但是如果使用SortShuffleManager,由于shuffle文件数量大大减少,不会产生大量的压缩数据流,所以内存开销大概不会成为主要问题。
剩下的就是CPU和压缩率的权衡取舍,和前面一样,取决于CPU/网络/磁盘的能力和负载,个人认为CPU通常更容易成为瓶颈。所以要调整性能,要不不压缩,要不使用Snappy可能性大一些?
对于RDD Cache的场合来说,绝大多数场合都是内存操作或者本地IO,所以CPU负载的问题可能比IO的问题更加突出,这也是为什么 spark.rdd.compress 本身默认为不压缩,如果要压缩,大概也是Snappy合适一些?


四、运行时相关
1、spark.default.parallelism
(本地模式:机器核数,其他:max(executor的core,2))
2、spark.broadcast.factory(默认 HttpBroadcastFactory)
3、spark.broadcast.blockSize(默认 4096)
TorrentBroadcastFactory块大小(以kb为单位)。过大会降低广播速度;过小会使印象BlockManager性能
4、spark.task.cpus(默认 1)
为每个任务分配的内核数
5、spark.task.maxFailures(默认 4)
Task的最大重试次数
6、spark.cores.max
当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内 核总数(不是指每台机器,而是整个集群)。如果不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值, 而Mesos将使用集群中可用的内核
7、spark.speculation(默认 false)
此参数设定是否使用推测执行机制,如果设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其他节点中重新启动,并将最先完成的Task的计算结果最为最终结果
8、spark.speculation.interval(默认 100)
Spark多长时间进行检查task运行状态用以推测,以毫秒为单位
9、spark.speculation.quantile(默认 0.75)
推测启动前,Stage必须要完成总Task的百分比
10、spark.speculation.multiplier(默认 1.5)
比已完成Task的运行速度中位数慢多少倍才启用推测
11、spark.scheduler.revive.interval(默认 1000)
复活重新获取资源的Task的最长时间间隔(毫秒),发生在Task因为本地资源不足而将资源分配给其他Task运行后进入等待时间,如果这个等待时间内重新获取足够的资源就继续计算
12、spark.deploy.spreadOut(默认 true)
Standalone集群管理器是否自由选择节点还是固定到尽可能少的节点,前者会有更好的数据本地性,后者对于计算密集型工作负载更有效
13、spark.deploy.defaultCores
如果没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,如果不设置,应用程序获取所有的有效内核。注意在一个共享的集群中,设置一个低值防止攫取了所有的内核,影响他人的使用

五、HistoryServer相关
1、spark.history.updateInterval(默认 10)
以秒为单位,更新日志相关信息的时间间隔
2、spark.history.retainedApplications(250)
保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除
3、spark.history.ui.port(默认 18080)
HistoryServer的web端口
4、spark.eventLog.enabled(默认 false)
是否记录Spark事件
5、spark.eventLog.dir
保存日志相关信息的路径,可以是hdfs://开头的HDFS路径,也可以是file://开头的本地路径,都需要提前创建
6、spark.yarn.historyServer.address
Server端的URL:Ip:port 或者host:port

六、spark-submit工具相关
其位于$SPARK_HOME/bin目录中

Usage: spark-submit [options] <app jar | python file> [app options]
参数名称
含义
--master MASTER_URL
可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
--deploy-mode DEPLOY_MODE
Driver程序运行的地方,client或者cluster
--class CLASS_NAME
主类名称,含包名
--name NAME
Application名称
--jars JARS
Driver依赖的第三方jar包
--py-files PY_FILES
用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
--files FILES
用逗号隔开的要放置在每个executor工作目录的文件列表
--properties-file FILE
设置应用程序属性的文件路径,默认是conf/spark-defaults.conf
--driver-memory MEM
Driver程序使用内存大小
--driver-java-options

--driver-library-path
Driver程序的库路径
--driver-class-path
Driver程序的类路径
--executor-memory MEM
executor内存大小,默认1G
--driver-cores NUM
Driver程序的使用CPU个数,仅限于Spark Alone模式
--supervise
失败后是否重启Driver,仅限于Spark Alone模式
--total-executor-cores NUM
executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
--executor-cores NUM
每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
--queue QUEUE_NAME
提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
--num-executors NUM
启动的executor数量,默认是2个,仅限于Spark on Yarn模式
--archives ARCHIVES
仅限于Spark on Yarn模式




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条