问题导读:
1、怎样配置清理策略?
2、怎样指定 Clustering 计划?
3、怎样进行文件聚合?
Hudi测试:批处理后文件据类再接流
本文详细阐述了在 “批处理后,流处理之前” 进行文件 Clustering 操作的方法。该方法可以将众多小文件合并成数量极少的大文件,从而防止过多小文件的产生。
在批处理结束后进行 Clustering 主要涉及如下几个步骤,它们主要都是通过 spark-submit 命令完成的:
批处理数据结束
首先用 bulk_insert 方式运行批处理任务。注意下面的操作都是在批处理任务完成后,接流之前进行。
查看表相关的 hdfs,可以发现由于使用了 bulk_insert 的方式写入数据,导致文件数量非常多,而每个文件的 Size 非常小。我们希望将每个分区的1000多个小文件聚合成几个大文件,以免造成不必要的查询和系统维护开销。
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/*
- 7 7 32637997 /flk_hudi/chdrpf_hudi_test03/.hoodie
- 1 1067 571117942 /flk_hudi/chdrpf_hudi_test03/1
- 1 1071 716513820 /flk_hudi/chdrpf_hudi_test03/2
- 1 1072 644997032 /flk_hudi/chdrpf_hudi_test03/3
- 1 1072 507397985 /flk_hudi/chdrpf_hudi_test03/4
- 1 1069 730774472 /flk_hudi/chdrpf_hudi_test03/5
- 1 1067 586561261 /flk_hudi/chdrpf_hudi_test03/6
- 1 1063 557377359 /flk_hudi/chdrpf_hudi_test03/7
- 1 1070 483416155 /flk_hudi/chdrpf_hudi_test03/8
- 1 1071 587965407 /flk_hudi/chdrpf_hudi_test03/A
- 1 1071 570651877 /flk_hudi/chdrpf_hudi_test03/B
- 1 1068 796163049 /flk_hudi/chdrpf_hudi_test03/C
- 1 1064 732633320 /flk_hudi/chdrpf_hudi_test03/D
- 1 1067 524777141 /flk_hudi/chdrpf_hudi_test03/E
- 1 1070 550302848 /flk_hudi/chdrpf_hudi_test03/F
- 1 1076 540059544 /flk_hudi/chdrpf_hudi_test03/G
- 1 1071 590094172 /flk_hudi/chdrpf_hudi_test03/H
- 1 1076 505755100 /flk_hudi/chdrpf_hudi_test03/I
- 1 1068 606771875 /flk_hudi/chdrpf_hudi_test03/J
- 1 1068 495261290 /flk_hudi/chdrpf_hudi_test03/K
- 1 1067 516964732 /flk_hudi/chdrpf_hudi_test03/L
- 1 1060 482056347 /flk_hudi/chdrpf_hudi_test03/M
- 1 1054 607625266 /flk_hudi/chdrpf_hudi_test03/N
- 1 1077 551989638 /flk_hudi/chdrpf_hudi_test03/O
- 1 1076 590537140 /flk_hudi/chdrpf_hudi_test03/P
- 1 1069 536362956 /flk_hudi/chdrpf_hudi_test03/Q
- 1 1072 559723804 /flk_hudi/chdrpf_hudi_test03/R
- 1 1067 546042696 /flk_hudi/chdrpf_hudi_test03/S
- 1 1059 528438508 /flk_hudi/chdrpf_hudi_test03/T
- 1 1063 518288413 /flk_hudi/chdrpf_hudi_test03/U
- 1 1070 543146873 /flk_hudi/chdrpf_hudi_test03/V
- 1 1066 532588113 /flk_hudi/chdrpf_hudi_test03/W
- 1 1069 494606809 /flk_hudi/chdrpf_hudi_test03/X
- 1 1079 527128056 /flk_hudi/chdrpf_hudi_test03/Y
- 1 1068 477378497 /flk_hudi/chdrpf_hudi_test03/Z
- 1 1075 471848267 /flk_hudi/chdrpf_hudi_test03/a
复制代码
查看当前 hdfs 路径下的文件个数。可以发现由于 bulk_insert 导致小文件非常之多,这会显著影响查询的性能 (一次查询可能要做几千个 IO 操作)。
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/
- 43 37452 22269590565 /flk_hudi/chdrpf_hudi_test03
复制代码
Clustering
配置清理策略
使用最简配置方法如下
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ cat /home/hadoop/hudi_clustering/clusteringjob.properties
- hoodie.clustering.inline.max.commits=2
- hoodie.clustering.plan.strategy.max.num.groups=40
复制代码
添加高级配置项
- [hadoop@p0-tklfrna-tklrna-device02 ~]$ cat /home/hadoop/hudi_clustering/clusteringjob.properties
- hoodie.clustering.inline=true
- hoodie.clustering.inline.max.commits=2
- hoodie.clustering.plan.strategy.max.num.groups=40
- hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
- hoodie.clustering.plan.strategy.max.bytes.per.group=2147483648
- hoodie.clustering.plan.strategy.small.file.limit=629145600
复制代码
Schedule
指定 Clustering 计划。计划制定完毕后 Hudi 对应 hdfs 的 Timeline 中会出现相应时间戳,以供执行计划。
- spark-submit \
- --master yarn \
- --class org.apache.hudi.utilities.HoodieClusteringJob \
- hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar \
- --schedule \
- --base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 \
- --table-name chdrpf_hudi_test03 \
- --props file:///home/hadoop/hudi_clustering/clusteringjob.properties \
- --spark-memory 16g \
- > /home/hadoop/hudi_clustering/clusteringjob.log 2>&1
复制代码
查看 Hdfs 中的 Hudi 的 Timeline 获取时间戳。文件后缀为 replacecommit.requested 的时间戳即为我们需要的时间戳。复制我们需要的 20220826105913373,以便下一步粘贴。
- [hadoop@p0-tklfrna-tklrna-device02 ~]$ hdfs dfs -ls /flk_hudi/chdrpf_hudi_test03/.hoodie/
- Found 407 items
- drwxr-xr-x - hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/.aux
- drwxr-xr-x - hadoop supergroup 0 2022-08-26 14:53 /flk_hudi/chdrpf_hudi_test03/.hoodie/.temp
- -rw-r--r-- 3 hadoop supergroup 18596070 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.inflight
- -rw-r--r-- 3 hadoop supergroup 14041389 2022-08-26 10:16 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.inflight
- ...
- -rw-r--r-- 3 hadoop supergroup 5685565 2022-08-26 10:59 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.requested
- ...
复制代码
Execute
Clustering 执行需要使用刚才的时间戳配置 --instant-time 20220826105913373 于命令中即可执行。
- spark-submit \
- --master yarn \
- --class org.apache.hudi.utilities.HoodieClusteringJob \
- hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar \
- --instant-time 20220826105913373 \
- --base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 \
- --table-name chdrpf_hudi_test03 \
- --props file:///home/hadoop/hudi_clustering/clusteringjob.properties \
- --spark-memory 16g \
- > /home/hadoop/hudi_clustering/clusteringjob_execution.log 2>&1
复制代码
文件聚类完毕后
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/*
- 7 10 39759457 /flk_hudi/chdrpf_hudi_test03/.hoodie
- 1 1068 644693330 /flk_hudi/chdrpf_hudi_test03/1
- 1 1072 912384991 /flk_hudi/chdrpf_hudi_test03/2
- 1 1073 783040567 /flk_hudi/chdrpf_hudi_test03/3
- 1 1073 535431665 /flk_hudi/chdrpf_hudi_test03/4
- 1 1070 938545286 /flk_hudi/chdrpf_hudi_test03/5
- 1 1068 676230669 /flk_hudi/chdrpf_hudi_test03/6
- 1 1064 625387487 /flk_hudi/chdrpf_hudi_test03/7
- 1 1071 494572949 /flk_hudi/chdrpf_hudi_test03/8
- 1 1072 675599389 /flk_hudi/chdrpf_hudi_test03/A
- 1 1072 643710911 /flk_hudi/chdrpf_hudi_test03/B
- 1 1069 1056860522 /flk_hudi/chdrpf_hudi_test03/C
- 1 1065 940690081 /flk_hudi/chdrpf_hudi_test03/D
- 1 1068 563929957 /flk_hudi/chdrpf_hudi_test03/E
- 1 1071 606406555 /flk_hudi/chdrpf_hudi_test03/F
- 1 1077 589463777 /flk_hudi/chdrpf_hudi_test03/G
- 1 1072 682564783 /flk_hudi/chdrpf_hudi_test03/H
- 1 1077 529816271 /flk_hudi/chdrpf_hudi_test03/I
- 1 1069 712917512 /flk_hudi/chdrpf_hudi_test03/J
- 1 1069 514668751 /flk_hudi/chdrpf_hudi_test03/K
- 1 1068 550874973 /flk_hudi/chdrpf_hudi_test03/L
- 1 1061 495250431 /flk_hudi/chdrpf_hudi_test03/M
- 1 1055 716887761 /flk_hudi/chdrpf_hudi_test03/N
- 1 1078 612144859 /flk_hudi/chdrpf_hudi_test03/O
- 1 1077 679350316 /flk_hudi/chdrpf_hudi_test03/P
- 1 1070 586176818 /flk_hudi/chdrpf_hudi_test03/Q
- 1 1073 625760986 /flk_hudi/chdrpf_hudi_test03/R
- 1 1068 603042997 /flk_hudi/chdrpf_hudi_test03/S
- 1 1060 576062292 /flk_hudi/chdrpf_hudi_test03/T
- 1 1064 555764103 /flk_hudi/chdrpf_hudi_test03/U
- 1 1071 598050377 /flk_hudi/chdrpf_hudi_test03/V
- 1 1066 532588113 /flk_hudi/chdrpf_hudi_test03/W
- 1 1069 494606809 /flk_hudi/chdrpf_hudi_test03/X
- 1 1079 527128056 /flk_hudi/chdrpf_hudi_test03/Y
- 1 1068 477378497 /flk_hudi/chdrpf_hudi_test03/Z
- 1 1075 471848267 /flk_hudi/chdrpf_hudi_test03/a
复制代码
运行清理
在进行完 Clustering 操作后,很多小文件都被合并进大文件了。由于 Hudi 不会主动删除过期和不必要的文件,因此需要利用手动清理策略来对过期文件进行清理删除。
清理策略的配置文件
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ cat /home/hadoop/hudi_clustering/hudi_cleaning.properties
- # hudi_cleaning.properties
-
- # When enabled, the cleaner table service is invoked immediately after each commit, to delete older file slices
- hoodie.clean.automatic=true
-
- # Only applies when hoodie.clean.automatic is turned on.
- # When turned on runs cleaner async with writing, which can speed up overall write performance.
- hoodie.clean.async=true
-
- # # This policy has the effect of keeping N number of file versions irrespective of time.
- # # This policy is useful when it is known how many MAX versions of the file does one want to keep at any given time.
- # # hoodie.cleaner.policy=KEEP_LATEST_COMMITS
- hoodie.cleaner.policy=KEEP_LATEST_COMMITS
-
- # # Number of commits to retain, without cleaning.
- # # This will be retained for num_of_commits * time_between_commits (scheduled).
- # # hoodie.cleaner.commits.retained=3
- # When KEEP_LATEST_FILE_VERSIONS cleaning policy is used,
- # the minimum number of file slices to retain in each file group, during cleaning.
- hoodie.cleaner.commits.retained=1
-
- # When set to true, cleaner also deletes the bootstrap base file when it's skeleton base file is cleaned.
- hoodie.cleaner.delete.bootstrap.base.file=false
- # Only if the log file size is greater than the threshold in bytes, the file group will be compacted.
-
- hoodie.commits.archival.batch=60
-
- hoodie.archive.merge.small.file.limit.bytes=104857600
- # When set to true, compaction service is triggered after each write.
- # While being simpler operationally, this adds extra latency on the write path.
- hoodie.compact.inline=false
-
- hoodie.parquet.small.file.limit=124857600
-
- hoodie.cleaner.parallelism=800
-
- hoodie.cleaner.incremental.mode=true
-
- # Archiving service moves older entries from timeline into an archived log after each write,
- # to keep the metadata overhead constant, even as the table size grows
- hoodie.keep.max.commits=3
- hoodie.keep.min.commits=2
复制代码
利用命令执行清理策略
- spark-submit \
- --class org.apache.hudi.utilities.HoodieCleaner \
- hdfs://nameservice1/utility_jars/hudi-utilities-bundle_2.12-0.10.0.jar \
- --props file:///home/hadoop/hudi_clustering/hudi_cleaning.properties \
- --target-base-path hdfs://nameservice1/flk_hudi/chdrpf_hudi_test03 \
- > /home/hadoop/hudi_clustering/clusteringjob_cleaning.log 2>&1
复制代码
接流处理任务
此时,可以将流处理任务接至该 Hudi 表中。文件清理的效果会在 Hudi 接流后显现。
清理后文件个数
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/* 39 2818 61047630 /flk_hudi/chdrpf_hudi_test03/.hoodie
- 1 5 295730057 /flk_hudi/chdrpf_hudi_test03/1
- 1 5 581449403 /flk_hudi/chdrpf_hudi_test03/2
- 1 5 541564433 /flk_hudi/chdrpf_hudi_test03/3
- 1 5 113526185 /flk_hudi/chdrpf_hudi_test03/4
- 1 5 819123981 /flk_hudi/chdrpf_hudi_test03/5
- 1 5 361258893 /flk_hudi/chdrpf_hudi_test03/6
- 1 4 205559110 /flk_hudi/chdrpf_hudi_test03/7
- 1 4 33721101 /flk_hudi/chdrpf_hudi_test03/8
- 1 5 352884732 /flk_hudi/chdrpf_hudi_test03/A
- 1 5 294248033 /flk_hudi/chdrpf_hudi_test03/B
- 1 5 771533591 /flk_hudi/chdrpf_hudi_test03/C
- 1 5 614827884 /flk_hudi/chdrpf_hudi_test03/D
- 1 5 157676833 /flk_hudi/chdrpf_hudi_test03/E
- 1 5 226004511 /flk_hudi/chdrpf_hudi_test03/F
- 1 5 198656601 /flk_hudi/chdrpf_hudi_test03/G
- 1 5 372307018 /flk_hudi/chdrpf_hudi_test03/H
- 1 5 97041611 /flk_hudi/chdrpf_hudi_test03/I
- 1 5 427390894 /flk_hudi/chdrpf_hudi_test03/J
- 1 5 78296341 /flk_hudi/chdrpf_hudi_test03/K
- 1 5 136428423 /flk_hudi/chdrpf_hudi_test03/L
- 1 5 53218521 /flk_hudi/chdrpf_hudi_test03/M
- 1 5 439899957 /flk_hudi/chdrpf_hudi_test03/N
- 1 5 242278011 /flk_hudi/chdrpf_hudi_test03/O
- 1 5 357549763 /flk_hudi/chdrpf_hudi_test03/P
- 1 5 200702230 /flk_hudi/chdrpf_hudi_test03/Q
- 1 5 265952714 /flk_hudi/chdrpf_hudi_test03/R
- 1 5 229783530 /flk_hudi/chdrpf_hudi_test03/S
- 1 5 191817537 /flk_hudi/chdrpf_hudi_test03/T
- 1 5 151138760 /flk_hudi/chdrpf_hudi_test03/U
- 1 5 221236895 /flk_hudi/chdrpf_hudi_test03/V
- 1 4112 2060894265 /flk_hudi/chdrpf_hudi_test03/W
- 1 4117 1910706738 /flk_hudi/chdrpf_hudi_test03/X
- 1 4169 2042792364 /flk_hudi/chdrpf_hudi_test03/Y
- 1 2221 995253322 /flk_hudi/chdrpf_hudi_test03/Z
- 1 1075 472877437 /flk_hudi/chdrpf_hudi_test03/a
复制代码
可以看到每个分区内的小文件已经被聚合成大文件,并随着流数据的进入,文件数量的增长速度也在合理范围内。
Ps: 我们把后几个分区作为对照组没有进行文件聚合。可以通过在 Clustering 的配置文件中调大 hoodie.clustering.plan.strategy.max.num.groups=30 的值来增加 SparkJob 的 parallelism 从而把所有分区涵盖进行,进行文件聚合。
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -count /flk_hudi/chdrpf_hudi_test03/
- 76 19050 17396389394 /flk_hudi/chdrpf_hudi_test03
复制代码
Timeline 观察
20220826105913373.replacecommit表示进行完毕聚类操作的时刻
20220826114108591.clean表示进行完毕清理操作的时刻
20220826114317026.commit表示进行完毕新数据写入操作的时刻
- [hadoop@p0-tklfrna-tklrna-device02 hudi_clustering]$ hdfs dfs -ls /flk_hudi/chdrpf_hudi_test03/.hoodie
- Found 30 items
- drwxr-xr-x - hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/.aux
- drwxr-xr-x - hadoop supergroup 0 2022-08-26 11:46 /flk_hudi/chdrpf_hudi_test03/.hoodie/.temp
- -rw-r--r-- 3 hadoop supergroup 18596070 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:10 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101036547.inflight
- -rw-r--r-- 3 hadoop supergroup 14041389 2022-08-26 10:16 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 10:14 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826101404432.inflight
- -rw-r--r-- 3 hadoop supergroup 1435895 2022-08-26 11:09 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:03 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.inflight
- -rw-r--r-- 3 hadoop supergroup 5685565 2022-08-26 10:59 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826105913373.replacecommit.requested
- -rw-r--r-- 3 hadoop supergroup 1009885 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:33 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:33 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113342082.inflight
- -rw-r--r-- 3 hadoop supergroup 3811303 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:37 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826113740364.inflight
- -rw-r--r-- 3 hadoop supergroup 2940587 2022-08-26 11:43 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.commit.requested
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:40 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114026452.inflight
- -rw-r--r-- 3 hadoop supergroup 5005100 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean
- -rw-r--r-- 3 hadoop supergroup 4260649 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean.inflight
- -rw-r--r-- 3 hadoop supergroup 4260649 2022-08-26 11:41 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114108591.clean.requested
- -rw-r--r-- 3 hadoop supergroup 2867542 2022-08-26 11:46 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114317026.commit
- -rw-r--r-- 3 hadoop supergroup 0 2022-08-26 11:43 /flk_hudi/chdrpf_hudi_test03/.hoodie/20220826114317026.commit.requested
复制代码
---------------------
|