分享

Apache Hudi + Flink作业运行指南

问题导读:
1、如何从 Kafka 读取数据写出到Hudi表?
2、启动时需要准备什么?
3、如何启动任务并查看进程?


近日Apache Hudi社区合并了Flink引擎的基础实现(HUDI-1327),这意味着 Hudi 开始支持 Flink 引擎。有很多小伙伴在交流群里咨询 Hudi on Flink 的使用姿势,三言两语不好描述,不如实操演示一把,于是有了这篇文章。

当前 Flink 版本的Hudi还只支持读取 Kafka 数据,Sink到 COW(COPY_ON_WRITE) 类型的 Hudi 表中,其他功能还在继续完善中。

这里我们简要介绍下如何从 Kafka 读取数据写出到Hudi表。

1. 打包

由于还没有正式发布, 我们需要到Github下载源码自行打包。

  1. git clone https://github.com/apache/hudi.git && cd hudi
  2. mvn clean package -DskipTests
复制代码

Windows 系统用户打包时会报如下错误:

  1. [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:exec (Setup HUDI_WS) on project hudi-integ-test: Command execution failed. Cannot run program "\bin\bash" (in directory "D:\github\hudi\hudi-integ-test"): Crea
  2. teProcess error=2, 系统找不到指定的文件。 -> [Help 1]
  3. [ERROR]
  4. [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
  5. [ERROR] Re-run Maven using the -X switch to enable full debug logging.
  6. [ERROR]
  7. [ERROR] For more information about the errors and possible solutions, please read the following articles:
  8. [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
  9. [ERROR]
  10. [ERROR] After correcting the problems, you can resume the build with the command
  11. [ERROR]   mvn <goals> -rf :hudi-integ-test
复制代码

这是 hudi-integ-test 模块的一个bash脚本无法执行导致的错误,我们可以把它注释掉(或者不用管)。

修改D:\github\hudi\pom.xml根pom文件

  1. <modules>
  2.     <module>hudi-common</module>
  3.     <module>hudi-cli</module>
  4.     <module>hudi-client</module>
  5.     <module>hudi-hadoop-mr</module>
  6.     <module>hudi-spark</module>
  7.     <module>hudi-timeline-service</module>
  8.     <module>hudi-utilities</module>
  9.     <module>hudi-sync</module>
  10.     <module>packaging/hudi-hadoop-mr-bundle</module>
  11.     <module>packaging/hudi-hive-sync-bundle</module>
  12.     <module>packaging/hudi-spark-bundle</module>
  13.     <module>packaging/hudi-presto-bundle</module>
  14.     <module>packaging/hudi-utilities-bundle</module>
  15.     <module>packaging/hudi-timeline-server-bundle</module>
  16.     <module>docker/hoodie/hadoop</module>
  17. <!--    <module>hudi-integ-test</module>-->
  18. <!--    <module>packaging/hudi-integ-test-bundle</module>-->
  19.     <module>hudi-examples</module>
  20.     <module>hudi-flink</module>
  21.     <module>packaging/hudi-flink-bundle</module>
  22.   </modules>
复制代码

再次执行 mvn clean package -DskipTests, 执行成功后,找到这个jar : D:\github\hudi\packaging\hudi-flink-bundle\target\hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar (笔者Hudi源码在D:\github\ 路径下,大家根据自己实际路径找一下)

这个 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar 就是我们需要使用的flink客户端,类似于原版的 hudi-utilities-bundle_2.11-x.x.x.jar 。


2. 入参介绍

有几个必传的参数介绍下:

&#8226;--kafka-topic :Kafka 主题
&#8226;--kafka-group-id :消费组
&#8226;--kafka-bootstrap-servers : Kafka brokers
&#8226;--target-base-path : Hudi 表基本路径
&#8226;--target-table :Hudi 表名
&#8226;--table-type :Hudi 表类型
&#8226;--props : 任务配置

其他参数可以参考 org.apache.hudi.HoodieFlinkStreamer.Config,里面每个参数都有介绍 。


3. 启动准备清单

1.Kafka 主题,消费组
2.jar上传到服务器
3.schema 文件
4.Hudi任务配置文件

注意根据自己的配置把配置文件放到合适的地方,笔者的 hudi-conf.properties和schem.avsc文件均上传在HDFS。

  1. -rw-r--r-- 1 user user      592 Nov 19 09:32 hudi-conf.properties
  2. -rw-r--r-- 1 user user 39086937 Nov 30 15:51 hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar
  3. -rw-r--r-- 1 user user 1410 Nov 17 17:52 schema.avsc
复制代码

hudi-conf.properties内容如下

  1. hoodie.datasource.write.recordkey.field=uuid
  2. hoodie.datasource.write.partitionpath.field=ts
  3. bootstrap.servers=xxx:9092
  4. hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
  5. hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
  6. hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
  7. hoodie.embed.timeline.server=false
  8. hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://olap/hudi/test/config/flink/schema.avsc
  9. hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://olap/hudi/test/config/flink/schema.avsc
复制代码

schema.avsc内容如下

  1. {
  2.   "type":"record",
  3.   "name":"stock_ticks",
  4.   "fields":[{
  5.      "name": "uuid",
  6.      "type": "string"
  7.   }, {
  8.      "name": "ts",
  9.      "type": "long"
  10.   }, {
  11.      "name": "symbol",
  12.      "type": "string"
  13.   },{
  14.      "name": "year",
  15.      "type": "int"
  16.   },{
  17.      "name": "month",
  18.      "type": "int"
  19.   },{
  20.      "name": "high",
  21.      "type": "double"
  22.   },{
  23.      "name": "low",
  24.      "type": "double"
  25.   },{
  26.      "name": "key",
  27.      "type": "string"
  28.   },{
  29.      "name": "close",
  30.      "type": "double"
  31.   }, {
  32.      "name": "open",
  33.      "type": "double"
  34.   }, {
  35.      "name": "day",
  36.      "type":"string"
  37.   }
  38. ]}
复制代码

4. 启动任务

  1. /opt/flink-1.11.2/bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 1024 -ytm 1024 -p 4 -ys 3 -ynm hudi_on_flink_test hudi-flink-bundle_2.11-0.6.1-SNAPSHOT.jar --kafka-topic hudi_test_flink --kafka-group-id hudi_on_flink --kafka-bootstrap-servers xxx:9092 --table-type COPY_ON_WRITE --target-base-path hdfs://olap/hudi/test/data/hudi_on_flink --target-table hudi_on_flink  --props hdfs://olap/hudi/test/config/flink/hudi-conf.properties --checkpoint-interval 3000 --flink-checkpoint-path hdfs://olap/hudi/hudi_on_flink_cp
复制代码

查看监控页面,任务已经跑起来了

1.png

现在在Hdfs路径下已经创建了一个空表(Hudi自动创建)

2.png

我们向 topic 中发数据(发了 900 条,本地写的 Producer 就不贴代码了)

3.png

我们查一下结果:

  1. @Test
  2.   public void query() {
  3.     spark.read().format("hudi")
  4.         .load(basePath + "/*/*/*/*")
  5.         .createOrReplaceTempView("tmp_view");
  6.     spark.sql("select * from tmp_view limit 2").show();
  7.     spark.sql("select count(1) from tmp_view").show();
  8.   }
复制代码
  1. +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+
  2. |_hoodie_commit_time|_hoodie_commit_seqno|  _hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|                uuid|           ts|              symbol|year|month|               high|               low|   key|             close|               open|day|
  3. +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+
  4. |     20201130162542| 20201130162542_0_20|01e11b9c-012a-461...|            2020/10/29|c8f3a30a-0523-4c8...|01e11b9c-012a-461...|1603947341061|12a-4614-89c3-f62...| 120|   10|0.45757580489415417|0.0816472025173598|01e11b|0.5795817262998396|0.15864898816336837|  1|
  5. |     20201130162542| 20201130162542_0_21|22e96b41-344a-4be...|            2020/10/29|c8f3a30a-0523-4c8...|22e96b41-344a-4be...|1603921161580|44a-4be2-8454-832...| 120|   10| 0.6200960168557579| 0.946080636091312|22e96b|0.6138608980526853| 0.5445994550724997|  1|
  6. +-------------------+--------------------+--------------------+----------------------+--------------------+--------------------+-------------+--------------------+----+-----+-------------------+------------------+------+------------------+-------------------+---+
复制代码
  1. +--------+
  2. |count(1)|
  3. +--------+
  4. |     900|
  5. +--------+
复制代码

5. 总结

本文简要介绍了使用 Flink 引擎将数据写出到Hudi表的过程。主要包括自主打可执行jar、启动参数介绍、Schema配置、Hudi任务参数配置等步骤


最新经典文章,欢迎关注公众号



---------------------

作者:王祥虎
来源:weixin
原文:Apache Hudi + Flink作业运行指南


已有(1)人评论

跳转到指定楼层
若无梦何远方 发表于 2021-8-22 11:18:00
看完之后还是模模糊糊的
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条