分享

一次Spark资源调优的记录

喵十八 2018-8-21 20:07:44 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 9436
问题导读
1. Spark出现问题如何定位?

2. Spark日志中,各种状态的意义?
3. Spark任务都有哪几个耗时阶段?
4. Spark任务如何提高资源利用率?



关注最新经典文章,欢迎关注公众号


综述
在开发完Spark作业之后,就该为作业配置合适的资源了。Spark的资源参数,基本都可以在spark-submit命令中作为参数设置。很多Spark初学者,通常不知道该设置哪些必要的参数,以及如何设置这些参数,最后就只能胡乱设置,甚至压根儿不设置。资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。

本文记录了一次,由于Spark任务阻塞引起的资源调优的过程,叙述了一些在Spark调优过程中会使用的方法,以供大家参考。

背景说明
我们使用第三方的数据和计算资源,这里需要说明,因为某些不可抗的因素,数据和计算资源都不在我们的控制范围内,只能通过特定的方式,将程序更新至第三方机器,通过crontab每天定时起调度。
原始数据按照不同来源,分小时存放。每个来源,每个小时的数据量分布不平衡。大的有数T,小的只有数百G。
我们的Spark任务,主要就是对这些数据数据进行处理,理论上,一个小时的数据起一个Spark任务。所有任务串行。这个任务串,我们称之一个任务list。

集群使用FAIR策略调度,一共分配给我们1000 vcore,4000g内存 的计算资源。且没有开启动态调整。这个是硬性规定,我们无法调整。


业务的主要逻辑
Step1. 对原始数据,根据需求提取数据,并参照维表进行Mapping操作。例如 原始记录为15个字段。提取了其中的OriginField1,OriginField2,OriginField3 3个字段,并对OriginField2字段进行Mapping。

映射维表类似如下格式
[mw_shl_code=text,true]a1 -> A
a2 -> A
b1 -> B
b2 -> B
b3 -> B
c1 -> C[/mw_shl_code]
这一步的目的是,通过维表对原始数据进行一个收缩。并进行归并。
如 原始数据为
[mw_shl_code=text,true]key1,a1,1
key1,a2,1
key2,a1,2
key2,b1,3[/mw_shl_code]

Mapping 之后的结果为
[mw_shl_code=text,true]key1,A,1
key1,A,1
key2,A,2
key2,B,3[/mw_shl_code]

再以(OriginField1,OriginField2)为key,进行reduce操作,作为输出。
[mw_shl_code=text,true]key1,A,2
key2,A,2
key2,B,3[/mw_shl_code]

Step2. 对Mapping并进行然后根据Mapping后的结果,按天段进行聚合,进一步缩小数据量。

Step3. 对按天聚合的结果,注册为临时表,使用SparkSQL 进行取数操作。


运行情况概述
每天只能在固定时间,通过crontab调脚本,起任务。每天起两个任务理想情况,当天起的任务能在48小时内结束,也就是说,大部分情况,整个集群会存在2个任务list并行的情况。

情况如下图
任务调度.png

问题发现与解决
问题出现
为描述方便,将连续4天定义为day1,day2,day3,day4
正常情况,在day3的任务list启动时,day1的任务list应该已经结束。但实际情况是,在day3的任务list启动时,day1的任务list还在运行。并且直到day4任务list启动时,还没有结束。也就是同时存在4个任务list并行的情况。

问题分析
当问题出现之后,便协调我们开发进行排查。但是非常尴尬的是,没有任何以往的情况,可以用来分析。
(这里不得不吐槽下我们的运维,真的是恨不得什么都开发写完,他只用敲个sh run.sh,当然我没有任何看不起运维这个职业的意思,docker 也是一群运维大神搞的,devops 也是很棒的一个概念。我想吐槽的只是那些只知道sh run.sh 的运维。)


收集数据
工欲善其事,必先利其器。既然没有数据,那第一步就先开始收集数据。我们的任务都是yarn-cluster模式运行的。Spark日志需要另外申请,能获取到的,也只有nohup.out中的日志,也就是如下形式。

[mw_shl_code=text,true]18/08/20 11:56:25 INFO impl.YarnClientImpl: Submitted application application_1534428021940_5843
18/08/20 11:56:26 INFO yarn.Client: Application report for application_1534428021940_5843 (state: ACCEPTED)
18/08/20 11:56:26 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.bigdata_dev_dashuju
         start time: 1534737385732
         final status: UNDEFINED
         tracking URL: http://namenode3:8088/proxy/application_1534428021940_5843/
         user: bugeinikan
18/08/20 11:56:27 INFO yarn.Client: Application report for application_1534428021940_5843 (state: ACCEPTED)
18/08/20 11:56:28 INFO yarn.Client: Application report for application_1534428021940_5843 (state: ACCEPTED)
18/08/20 11:56:29 INFO yarn.Client: Application report for application_1534428021940_5843 (state: RUNNING)
18/08/20 11:56:29 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 192.168.111.161
         ApplicationMaster RPC port: 0
         queue: root.bigdata_dev_dashuju
         start time: 1534737385732
         final status: UNDEFINED
         tracking URL: http://namenode3:8088/proxy/application_1534428021940_5843/
         user: bugeinikan
18/08/20 11:56:30 INFO yarn.Client: Application report for application_1534428021940_5843 (state: RUNNING)
18/08/20 11:56:31 INFO yarn.Client: Application report for application_1534428021940_5843 (state: RUNNING)
.
.
.
18/08/20 11:57:05 INFO yarn.Client: Application report for application_1534428021940_5843 (state: FINISHED)[/mw_shl_code]

由于我们并不是要解决Spark程序报错,只是优化调度,这个日志对我们而言,足够了。
这个日志中包含了很多重要的信息:任务的submit时刻,任务accepted的时刻,任务开始running的时刻,任务finished的时刻。
由这四个时刻,可以计算出任务的submit,accepted,running三个部分的耗时。
写个代码,遍历了一遍日志就获取到了我想要的信息。如下表

日期submit平均耗时accepted平均耗时running平均耗时
20180720
9.41
278.67
784.11
20180721
9.62
298.81
643.48
20180722
9.81
269.29
674.23
20180723
9.09
60.82
563.43
20180724
10.39
41.66
654.18
20180725
13.59
22.77
624.83



这里说明下Spark中,这几个时间的含义:
SUBMIT 耗时:Spark任务提交到集群的时候,存在环境变量检查,jvm启动等过程,感兴趣的可以看这篇文章Spark源码解析之任务提交(spark-submit)篇。总结一下,就是这部分耗时基本不变,且难以优化。(除非减少任务数量)
ACCEPTED 耗时:任务提交成功之后,便会进入accepted状态,如果集群资源足够,那么就可以马上分配资源进如RUNNING状态,如果资源存在竞争,申请不到启动任务所需的最小资源数,就处于等待状态。可以看到,我们的任务存在非常严重资源竞争情况,等待时间过长。
RUNNING 耗时:这部分就是任务运行耗时了。


日志分析结果
从上一节收集的数据,可以发现我们任务慢的一个原因就是等待时间过长。也就是资源竞争造成的。我们之前的预估,是给每个任务list 500个vcore的计算资源,按理来说,不应出现竞争情况。于是便找运维要来了提交脚本。很快就发现了原因。
提交脚本部分如下:
[mw_shl_code=shell,true]
spark-submit --master yarn-cluster --name XXX --driver-memory 6g --executor-memory 6g  --num-executors 500[/mw_shl_code]

好吧,每个executor分了6g 的内存,500 * 6g * 2 = 6000g 已经超过了4000g的分配资源,自然存在竞争。(好吧,以后这个run.sh还是我们来写吧 XD)



改进及前后数据对比
所以,我们做的修改,只是修改了一个提交参数。然后任务list就恢复了正常。
从上表中,也可以发现前后的结果对比,我们是0723开始调整的,之后0724,0725的等待时间比之前大幅缩短。

后来也问了运维为啥配置给了6g,答曰:习惯。。。。。。


进一步的优化
本着精益求精的方式,当然需要谋求进一步的优化。因为各个数据源中,数据量的不同,存在这样的情况,有的小时文件片数只有300片,如果分配500个executor,根本无法利用满这500个executor。
因而通过统计不同数据源的文件片数,将任务list进行拆分,将每小时文件片数在200片以下的放到一个队列,将每小时文件片数在200以上的放到另一个队列。提高了资源的利用率,使得当天启动的任务list能够在24小时内跑完。调整之后,平均每片文件的运行时间如下:

日期submit平均耗时accepted平均耗时running平均耗时平均每片文件耗时
20180720
9.41
278.67
784.11
0.81
20180721
9.62
298.81
643.48
0.78
20180722
9.81
269.29
674.23
0.56
20180723
9.09
60.82
563.43
0.62
20180724
10.39
41.66
654.18
0.69
20180725
13.59
22.77
624.83
0.60
20180726
14.90
25.48
415.99
0.61
20180727
9.82
17.07
331.34
0.45
20180728
9.13
16.56
281.15
0.52
降低了平均每片文件的运行耗时,提升了整体的资源利用率。

总结
本文记录了一次Spark资源调度调优的过程。当运行出现问题的时候,需要从日志中分析出原因,这是一个很重要的技能。此外,想要调优,也必须先从日志中收集数据,基于数据说话,才能更好的发现问题,与定位问题。





已有(7)人评论

跳转到指定楼层
jiangzi 发表于 2018-8-22 19:56:12
一次Spark资源调优的记录~~
回复

使用道具 举报

似水流_vKwkA 发表于 2018-8-23 16:58:12
请教一下,每个executor分了6g 的内存,500 * 6g * 2     这个2是什么意思啊?
回复

使用道具 举报

似水流_vKwkA 发表于 2018-8-23 17:54:43
本帖最后由 似水流_vKwkA 于 2018-8-23 18:00 编辑
pig2 发表于 2018-8-23 17:32
--driver-memory 6g --executor-memory 6g  --num-executors 500
这里特指上面
--driver-memory 6g --executor-memory 6g  --num-executors 500
500 * 6g * 2


你好,num-executors *executor-memory *2 意思是指 提交了两个相同的spark任务吗


回复

使用道具 举报

s060403072 发表于 2018-8-23 18:23:36
似水流_vKwkA 发表于 2018-8-23 17:54
--driver-memory 6g --executor-memory 6g  --num-executors 500
500 * 6g * 2

一个是driver内存,一个是每个excutor的内存。
是同一个任务的内存分配。

看下图:

回复

使用道具 举报

喵十八 发表于 2018-8-27 19:28:01
似水流_vKwkA 发表于 2018-8-23 16:58
请教一下,每个executor分了6g 的内存,500 * 6g * 2     这个2是什么意思啊?

是这样的,最开始的版本,集群是两个队列的任务并行,也就是说,同一时刻有2个Spark任务在跑。按照理论值,单个任务占用的内存为500*6 (每个executor 占用6g,共500个executor)。因为两个任务并行,就*2
回复

使用道具 举报

似水流_vKwkA 发表于 2018-8-27 20:43:46
喵十八 发表于 2018-8-27 19:28
是这样的,最开始的版本,集群是两个队列的任务并行,也就是说,同一时刻有2个Spark任务在跑。按照理论值 ...

学习了 感谢各位大佬
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条