分享

Flink SQL作业调优:手动配置调优

问题导读:
1.手动配置调优的内容主要有哪3种类型?
2.作业参数调优的优缺点?
3.资源调优的优缺点?

上一篇
Flink SQL开发指南:AutoConf自动配置调优

本文为您介绍实时计算作业手动配置调优。

手动配置调优 {#section_q1g_lhm_bgb .section}
手动配置调优的内容主要有3种类型:
  • 作业参数调优:主要对作业中的miniBatch等参数进行调优;
  • 资源调优:主要对作业中的Operator的并发数(arallelism)、CPU(Core)、堆内存(heap_memory)等参数进行调优;
  • 上下游参数调优:主要对作业中的上下游存储参数进行调优。
下面通过3个章节对以上3类调优进行介绍,参数调优后将生成新的配置,JPb需重新上线启动/恢复才能使用新的配置,本文最后章节将讲述如何重新启用新的配置。

作业参数调优 {#section_sxj_yjm_bgb .section}
miniBatch设置:该设置只能优化group by。Flink SQL纯流模式下,每来一条数据都会去操作state,io消耗较大,设置miniBatch后,同一个key的一批数据只访问一次state,且只输出最新的一条数据,即减少了state访问也减少了向下游的数据更新。miniBatch设置如下:如果是新增加的作业参数建议用户停止重启,如果是改变作业参数大小暂停恢复即可。
  1. # excatly-once语义
  2. blink.checkpoint.mode=EXACTLY_ONCE
  3. # checkpoint间隔时间,单位毫秒
  4. blink.checkpoint.interval.ms=180000
  5. blink.checkpoint.timeout.ms=600000
  6. # 2.x使用niagara作为statebackend,以及设定state数据生命周期,单位毫秒
  7. state.backend.type=niagara
  8. state.backend.niagara.ttl.ms=129600000
  9. # 2.x开启5秒的microbatch(窗口函数不要设置这个参数。)
  10. blink.microBatch.allowLatencyMs=5000
  11. # 表示整个job允许的延迟
  12. blink.miniBatch.allowLatencyMs=5000
  13. # 单个batch的size
  14. blink.miniBatch.size=20000
  15. # local 优化,2.x默认已经开启,1.6.4需手动开启
  16. blink.localAgg.enabled=true
  17. # 2.x开启partial优化,解决count distinct热点
  18. blink.partialAgg.enabled=true
  19. # union all 优化
  20. blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
  21. # GC 优化(SLS做源表不能设置。)
  22. blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
  23. # 时区设置
  24. blink.job.timeZone=Asia/Shanghai
复制代码
a1.png




资源调优 {#section_dj3_clm_bgb .section}
  • 分析问题
    • 通过job的拓扑图查看到2号的TASK节点的输入队列已达到100%造成数据堆积反压到它的上游1号的TASK节点,输出队列造成数据堆积。
a2.png

        2.单击2号的TASK节点,找到队列已达到100%的TaskExecutor。

a3.png


a4.png


       3.查看TaskExecutor的CPU和内存的使用量,根据使用量来调大相应的CPU和MEM。

a5.png



  • 性能调优

    • 进入调优窗口






    • a6.png
    •   打开可视化编辑
a7.png







    • 找到2号task对应的Group(若有)或Operator,可以按Group批量修改或对单个Operator进行参数修改。
      • 按Group批量修改:







    a8.png








    a9.png

    • 单个Operator修改:
a10.png
















    a11.png









    • 找到对应的Operator进行修改:
a12.png








a13.png








    • 配置参数后点击右上角的应用当前配置并关闭窗口,即可保存当前配置。 说明:
  • 如果在调优的过程中发现虽然调大了某个Group的资源配置但是并没有太大的效果,首先需要确认该节点是否有数据倾斜现象,如果有数据倾斜,首要解决数据倾斜;其次如果没有数据倾斜需要把其中复杂运算的Operator节点(如:group by、window、join等)拆开,来判断到底是哪个子节点有异常。如果找到了该子节点只需调优该节点就好。将Operator子节点拆开的方法:点击需要修改的Operator,将其参数chainingStrategy修改为HEAD,若其已为HEAD,需要将其后面的第一个Operator修改为HEAD。 chainingStrategy三个参数的解释如下:
    • ALWAYS:代表把单个的节点合并成一个大的GROUP里面。
    • NEVER:保持不变
    • HEAD :表示把在合并成一个GROUP里面单个节点查分出来。
  • 资源参数的配置原则和建议
    • 可调参数
      • parallelism
        • source
          说明: source的并发不能大于source的shard数。
          • 资源根据上游Partition数来。
          • 例如SOURCE的个数是16,那么source的并发可以配置为16、8或4等,不要超过16。
        • 中间的处理节点
          • 根据预估的QPS计算。
          • 对于小任务来说,和source一样的并发度就够了。
          • QPS高的任务,可以配大点,例如 64,128 或者 256。
        • sink节点
          • 并发度和下游存储的Partition数相关,一般是下游Partition个数的2~3倍。
          • 如果配置太大会导致写超时或失败。例如下游SINK的个数是16,那么建议sink的并发最大可以配置48。
      • CORE
        CPU,默认 0.1,根据实际CPU使用配置(但最好能被1整除),一般建议0.25
      • heap_memory
        堆内存,默认 256MB,根据实际内存使用配置 点击GROUP就可以编辑以上参数。
    • 存在GROUP BY的TASK节点可配置参数
      state_size:state大小,默认0。如果operator有用state,需要把state_size配成1,表示该operator会用state,job在申请资源的时候会额外为该operator申请内存,供state访问使用;如果不配成1,job可能被yarn kill。(state_size需要配成1的operator有:group by、 join、over和window)虽然有这么多配置项,对普通用户来说,只需要关心:core,parallelism和heap_memory。整个job 建议core:mem=1:4,即一个核对应4G内存。
      说明:
      调parallelism和memory,有什么规则吗?
      一个operator的总CU=并发*core 一个operator的总mem=并发*heap_mem 一个group中的core取最大值,mem取各个operator的sum,CPU和MEM的关系是1:4的关系;比如您的core给的是1CU,mem给的是3G,那么最终分配的是1CU+4G。您的core给的是1CU,mem给的是5G,那么最终分配的是1.25CU+5G。

上下游参数调优 {#section_wml_snm_bgb .section}
由于实时计算的特性,每条数据均会触发上下游存储的读写,会对上下游存储形成性能压力,可以通过设置batchsize,批量的读写上下游存储数据来降低对上下游存储的压力,支持batchsize参数的上下游存储如下:
|名称|参数|详情|设置参数值| |DATAHUB源表|batchReadSize|单次读取条数|可选,默认为10| |DATAHUB结果表|batchSize|单次写入条数|可选,默认为300。| |日志服务(Log Service)源表|batchGetSize|单次读取logGroup条数|可选,默认为10。| |分析型数据库(AnalyticDB)结果表|batchSize|每次写的批次大小|可选,默认为1000。| |云数据库(RDS)结果表|batchSize|每次写的批次大小|可选,默认为50。| |云数据库HybridDB for MySQL(petaData)结果表|batchSize|每次写的批次大小|可选,默认值1000 ,表示每次写多少条,经验建议最大设置4096。| |bufferSize|去重的buffer大小,需要指定主键才生效。|可选。设置batchSize必须设置bufferSize,经验建议最大设置4096。|
示例:
a14.png



重新启用新的配置 {#section_isd_5nm_bgb .section}
通过1-3章节完成配置后,需要重新启动/恢复作业才能使新配置生效。
1.上线作业,配置方式必须选择使用上次资源配置 。

a15.png

2.暂停原作业。
a16.png

3.恢复原作业。
a17.png

4.选择按最新配置恢复, 否则新配置无法生效
a18.png

5.重新恢复后,可通过运维 > 运行信息 > Vertex拓扑查看新的配置是否生效。

说明:
一般情况下我们不建议采用先停止job再启动的方式使新配置生效,因为job停止后status状态会消除,可能会导致计算结果不一致。

本文档涉及到的相关名词解释 {#section_s12_c4m_bgb .section}
  • global
    • isChainingEnabled :表示是否启用chain策略,默认为 true,不需要修改。
  • nodes
    • id:节点id号,自动生成,唯一,不需要修改。
    • uid: 节点uid号,用于计算operator id,如果不设置,会使用id。
    • pact:节点类型,例如Data Source,Operator,Data Sink等等,不需要修改。
    • name:节点名字,用户可以自定义。
    • slotSharingGroup:default,不需要修改。
    • chainingStrategy:chain的策略,有 HEAD、ALWAYS和NEVER,根据需要修改。
    • parallelism:并发度,默认为1,可以根据实际数据量改大点。
    • core:CPU,默认0.1,根据实际CPU使用配置(但最好能被1整除),一般建议0.25。
    • heap_memory:堆内存,默认256MB,根据实际内存使用配置。
    • direct_memory:jvm堆外内存,默认0,建议不要修改。
    • native_memory:jvm堆外内存,jni使用,默认0,建议用10MB。
  • chain
    • Flink SQL任务是一个DAG图,会有很多个节点(Operator),有些上下游的节点在运行时是可以合成一个点的,这称之为chain。对于chain之后的点,CPU取最大的最大值,内存取总和。例如Node1如果operator有用state,Node2{128MB,0.5core},Node3{128MB,0.25core},那么这三个点chain后的CPU是 0.5core,内存是 512MB。chain的规则简单来说就是:并发度需要一样。但是,有些节点之间是不能合在一起的,比如groupBy。一般来说,尽可能的让节点都chain在一起,减少网络传输。

最新经典文章,欢迎关注公众号



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条