分享

基于Apache Hudi的多库多表实时入湖最佳实践

问题导读:
1、CDC数据实时如何写入MSK?
2、OLAP引擎如何查询Hudi表?
3、如何将Flink CDC发送数据到Kafka?
4、Flink Streaming Read 实时聚合如何实现?



1. 前言

CDC(Change Data Capture)从广义上讲所有能够捕获变更数据的技术都可以称为CDC,但本篇文章中对CDC的定义限定为以非侵入的方式实时捕获数据库的变更数据。例如:通过解析MySQL数据库的Binlog日志捕获变更数据,而不是通过SQL Query源表捕获变更数据。Hudi 作为最热的数据湖技术框架之一, 用于构建具有增量数据处理管道的流式数据湖。其核心的能力包括对象存储上数据行级别的快速更新和删除,增量查询(Incremental queries,Time Travel),小文件管理和查询优化(Clustering,Compactions,Built-in metadata),ACID和并发写支持。Hudi不是一个Server,它本身不存储数据,也不是计算引擎,不提供计算能力。其数据存储在S3(也支持其它对象存储和HDFS),Hudi来决定数据以什么格式存储在S3(Parquet,Avro,…), 什么方式组织数据能让实时摄入的同时支持更新,删除,ACID等特性。Hudi通过Spark,Flink计算引擎提供数据写入, 计算能力,同时也提供与OLAP引擎集成的能力,使OLAP引擎能够查询Hudi表。从使用上看Hudi就是一个JAR包,启动Spark, Flink作业的时候带上这个JAR包即可。Amazon EMR 上的Spark,Flink,Presto ,Trino原生集成Hudi, 且EMR的Runtime在Spark,Presto引擎上相比开源有2倍以上的性能提升。在多库多表的场景下(比如:百级别库表),当我们需要将数据库(mysql,postgres,sqlserver,oracle,mongodb等)中的数据通过CDC的方式以分钟级别(1minute+)延迟写入Hudi,并以增量查询的方式构建数仓层次,对数据进行实时高效的查询分析时。我们要解决三个问题,第一,如何使用统一的代码完成百级别库表CDC数据并行写入Hudi,降低开发维护成本。第二,源端Schema变更如何同步到Hudi表。第三,使用Hudi增量查询构建数仓层次比如ODS->DWD->DWS(各层均是Hudi表),DWS层的增量聚合如何实现。本篇文章推荐的方案是: 使用Flink CDC DataStream API(非SQL)先将CDC数据写入Kafka,而不是直接通过Flink SQL写入到Hudi表,主要原因如下,第一,在多库表且Schema不同的场景下,使用SQL的方式会在源端建立多个CDC同步线程,对源端造成压力,影响同步性能。第二,没有MSK做CDC数据上下游的解耦和数据缓冲层,下游的多端消费和数据回溯比较困难。CDC数据写入到MSK后,推荐使用Spark Structured Streaming DataFrame API或者Flink StatementSet 封装多库表的写入逻辑,但如果需要源端Schema变更自动同步到Hudi表,使用Spark Structured Streaming DataFrame API实现更为简单,使用Flink则需要基于HoodieFlinkStreamer做额外的开发。Hudi增量ETL在DWS层需要数据聚合的场景的下,可以通过Flink Streaming Read将Hudi作为一个无界流,通过Flink计算引擎完成数据实时聚合计算写入到Hudi表。


2. 架构设计与解析
2022-08-10_183550.jpg
2.1 CDC数据实时写入MSK

图中标号1,2是将数据库中的数据通过CDC方式实时发送到MSK(Amazon托管的Kafka服务)。flink-cdc-connectors[1]是当前比较流行的CDC开源工具。它内嵌debezium[2]引擎,支持多种数据源,对于MySQL支持Batch阶段(全量同步阶段)并行,无锁,Checkpoint(可以从失败位置恢复,无需重新读取,对大表友好)。支持Flink SQL API和DataStream API,这里需要注意的是如果使用SQL API对于库中的每张表都会单独创建一个链接,独立的线程去执行binlog dump。如果需要同步的表比较多,会对源端产生较大的压力。在需要整库同步表非常多的场景下,应该使用DataStream API写代码的方式只建一个binlog dump同步所有需要的库表。另一种场景是如果只同步分库分表的数据,比如user表做了分库,分表,其表Schema都是一样的,Flink CDC的SQL API支持正则匹配多个库表,这时使用SQL API同步依然只会建立一个binlog dump线程。需要说明的是通过Flink CDC可以直接将数据Sink到Hudi, 中间无需MSK,但考虑到上下游的解耦,数据的回溯,多业务端消费,多表管理维护,依然建议CDC数据先到MSK,下游再从MSK接数据写入Hudi。

2.2 CDC工具对比

图中标号3,除了flink-cdc-connectors之外,DMS(Amazon Database Migration Services)是Amazon 托管的数据迁移服务,提供多种数据源(mysql,oracle,sqlserver,postgres,mongodb,documentdb等)的CDC支持,支持可视化的CDC任务配置,运行,管理,监控。因此可以选择DMS作为CDC的解析工具,DMS支持将MSK或者自建Kafka作为数据投递的目标,所以CDC实时同步到MSK通过DMS可以快速可视化配置管理。当然除了DMS之外还有很多开源的CDC工具,也可以完成CDC的同步工作,但需要在EC2上搭建相关服务。下图列出了CDC工具的对比项,供大家参考

2022-08-10_183630.jpg

2.3 Spark Structured Streaming多库表并行写Hudi及Schema变更

图中标号4,CDC数据到了MSK之后,可以通过Spark/Flink计算引擎消费数据写入到Hudi表,我们把这一层我们称之为ODS层。无论Spark还是Flink都可以做到数据ODS层的数据落地,使用哪一个我们需要综合考量,这里阐述一些相对重要的点。首先对于Spark引擎,我们一定是使用Spark Structured Streaming 消费MSK写入Hudi,由于可以使用DataFrame API写Hudi, 因此在Spark中可以方便的实现消费CDC Topic并根据其每条数据中的元信息字段(数据库名称,表名称等)在单作业内分流写入不同的Hudi表,封装多表并行写入逻辑,一个Job即可实现整库多表同步的逻辑。样例代码截图如下,完整代码点击Github[3]获取

2022-08-10_183710.jpg

我们知道CDC数据中是带着I(insert)、U(update)、D(delete)信息的, 不同的CDC工具数据格式不同,但要表达的含义是一致的。使用Spark写入Hudi我们主要关注U、D信息,数据带着U信息表示该条数据是一个更新操作,对于Hudi而言只要设定源表的主键为Hudi的recordKey,同时根据需求场景设定precombineKey即可。这里对precombineKey做一个说明,它表示的是当数据需要更新时(recordKey相同), 默认选择两条数据中precombineKey的大保留在Hudi中。其实Hudi有非常灵活的Payload机制,通过参数hoodie.datasource.write.payload.class可以选择不同的Payload实现,比如Partial Update(部分字段更新)的Payload实现OverwriteNonDefaultsWithLatestAvroPayload,也可以自定义Payload实现类,它核心要做的就是如何根据precombineKey指定的字段更新数据。所以对于CDC数据Sink Hudi而言,我们需要保证上游的消息顺序,只要我们表中有能判断哪条数据是最新的数据的字段即可,那这个字段在MySQL中往往我们设计成数据更新时间modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP 。如果没有类似字段,建议定义设计规范加上这个字段,否则就必须保证数据有序(这会给架构设计和性能带来更多的阻力),不然数据在Hudi中Updata的结果可能就是错的。对于带着D信息的数据,它表示这条数据在源端被删除,Hudi是提供删除能力的,其中一种方式是当一条数据中包含_hoodie_is_deleted字段,且值为true是,Hudi会自动删除此条数据,这在Spark Structured Streaming 代码中很容易实现,只需在map操作实现添加一个字段且当数据中包含D信息设定字段值为true即可。

2.4 Flink StatementSet多库表CDC并行写Hudi

对于使用Flink引擎消费MSK中的CDC数据落地到ODS层Hudi表,如果想要在一个JOB实现整库多张表的同步,Flink StatementSet来实现通过一个Kafka的CDC Source表,根据元信息选择库表Sink到Hudi中。但这里需要注意的是由于Flink和Hudi集成,是以SQL方式先创建表,再执行Insert语句写入到该表中的,如果需要同步的表有上百之多,封装一个自动化的逻辑能够减轻我们的工作,你会发现SQL方式写入Hudi虽然对于单表写入使用上很方便,不用编程只需要写SQL即可,但也带来了一些限制,由于写入Hudi时是通过SQL先建表,Schema在建表时已将定义,如果源端Schema变更,通过SQL方式是很难实现下游Hudi表Schema的自动变更的。虽然在Hudi的官网并未提供Flink DataStream API写入Hudi的例子,但Flink写入Hudi是可以通过HoodieFlinkStreamer以DataStream API的方式实现,在Hudi源码[4]中可以找到。因此如果想要更加灵活简单的实现多表的同步,以及Schema的自动变更,需要自行参照HoodieFlinkStreamer代码以DataStream API的方式写Hudi。对于I,U,D信息,Flink的debezium ,maxwell,canal format会直接将消息解析 为Flink的changelog流,换句话说就是Flink会将I,U,D操作直接解析成Flink内部的数据结构RowData,直接Sink到Hudi表即可,我们同样需要在SQL中设定recordKey,precombineKey,也可以设定Payload class的不同实现类。

2.5 Flink Streaming Read模式读Hudi实现ODS层聚合

图中标号5,数据通过Spark/Flink落地到ODS层后,我们可能需要构建DWD和DWS层对数据做进一步的加工处理,(DWD和DWS并非必须的,根据你的场景而定,你可以直接让OLAP引擎查询ODS层的Hudi表)我们希望能够使用到Hudi的增量查询能力,只查询变更的数据来做后续DWD和DWS的ETL,这样能够加速构建同时减少资源消耗。对于Spark引擎,在DWD层如果仅仅是对数据做map,fliter等相关类型操作,是可以使用增量查询的,但如果DWD层的构建有Join操作,是无法通过增量查询实现的,只能全表(或者分区)扫描。DWS层的构建如果聚合类型的操作没有去重,窗口类型的操作,只是SUM, AVG,MIN, MAX等类型的操作,可以通过增量查询之后和目标表做Merge实现,反之,只能全表(或者分区)扫描。对于Flink引擎来构建DWD和DWS, 由于Flink 支持Hudi表的streaming read, 在SQL设定read.streaming.enabled= true,changelog.enabled=true等相关流式读取的参数即可。设定后Flink把Hudi表当做了一个无界的changelog流表,无论怎样做ETL都是支持的,Flink会自身存储状态信息,整个ETL的链路是流式的。

2.6 OLAP引擎查询Hudi表

图中标号6, EMR Hive/Presto/Trino 都可以查询Hudi表,但需要注意的是不同引擎对于查询的支持是不同的,参见官网[5],这些引擎对于Hudi表只能查询,不能写入。关于Schema的自动变更,首先Hudi自身是支持Schema Evolution[6],我们想要做到源端Schema变更自动同步到Hudi表,通过上文的描述,可以知道如果使用Spark引擎,可以通过DataFrame API操作数据,通过from_json动态生成DataFrame,因此可以较为方便的实现自动添加列。如果使用Flink引擎上文已经说明想要自动实现Schema的变更,通过HoodieFlinkStreamer以DataStream API的方式实现Hudi写入的同时融入Schema变更的逻辑。

3. EMR CDC整库同步Demo

接下的Demo操作中会选择RDS MySQL作为数据源,Flink CDC DataStream API 同步库中的所有表到Kafka,使用Spark引擎消费Kafka中binlog数据实现多表写入ODS层Hudi,使用Flink引擎以streaming read的模式做DWD和DWS层的Hudi表构建。

3.1 环境信息
  1. EMR 6.6.0
  2. Hudi 0.10.0
  3. Spark 3.2.0
  4. Flink 1.14.2  
  5. Presto 0.267
  6. MySQL 5.7.34
复制代码

3.2 创建源表

在MySQL中创建test_db库及user,product,user_order三张表,插入样例数据,后续CDC先加载表中已有的数据,之后源添加新数据并修改表结构添加新字段,验证Schema变更自动同步到Hudi表。
  1. -- create databases
  2. create database if not exists test_db default character set utf8mb4 collate utf8mb4_general_ci;
  3. use test_db;
  4. -- create  user table
  5. drop table if exists user;
  6. create table if not exists user
  7. (
  8.     id           int auto_increment primary key,
  9.     name         varchar(155)                        null,
  10.     device_model varchar(155)                        null,
  11.     email        varchar(50)                         null,
  12.     phone        varchar(50)                         null,
  13.     create_time  timestamp default CURRENT_TIMESTAMP not null,
  14.     modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
  15. )charset = utf8mb4;
  16. -- insert data
  17. insert into user(name,device_model,email,phone) values
  18. ('customer-01','dm-01','abc01@email.com','188776xxxxx'),
  19. ('customer-02','dm-02','abc02@email.com','166776xxxxx');
  20. -- create product table
  21. drop table if exists product;
  22. create table if not exists product
  23. (
  24.     pid          int not null primary key,
  25.     pname        varchar(155)                        null,
  26.     pprice       decimal(10,2)                           ,
  27.     create_time  timestamp default CURRENT_TIMESTAMP not null,
  28.     modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
  29. )charset = utf8mb4;
  30. -- insert data
  31. insert into product(pid,pname,pprice) values
  32. ('1','prodcut-001',125.12),
  33. ('2','prodcut-002',225.31);
  34. -- create order table
  35. drop table if exists user_order;
  36. create table if not exists user_order
  37. (
  38.     id           int auto_increment primary key,
  39.     oid          varchar(155)                        not null,
  40.     uid          int                                         ,
  41.     pid          int                                         ,
  42.     onum         int                                         ,
  43.     create_time  timestamp default CURRENT_TIMESTAMP not null,
  44.     modify_time  timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP
  45. )charset = utf8mb4;
  46. -- insert data
  47. insert into user_order(oid,uid,pid,onum) values
  48. ('o10001',1,1,100),
  49. ('o10002',1,2,30),
  50. ('o10001',2,1,22),
  51. ('o10002',2,2,16);
  52. -- select data
  53. select * from user;
  54. select * from product;
  55. select * from user_order;
复制代码


2022-08-10_183749.jpg

3.3 Flink CDC发送数据到Kafka

使用DataStream API编写CDC同步程序。样例代码Github[7]

# 创建topic
  1. kafka-topics.sh --create --zookeeper ${zk}  --replication-factor 2 --partitions 8  --topic cdc_topic
复制代码

# 下载代码,编译打包
  1. mvn clean package  -Dscope.type=provided  -DskipTests
复制代码

# 也可以使用已经打好的包,进入EMR主节点,执行命令
  1. wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-flink-cdc-1.0-SNAPSHOT.jar
  2. # disalbe check-leaked-classloader
  3. sudo sed -i -e '$a\classloader.check-leaked-classloader: false' /etc/flink/conf/flink-conf.yaml
复制代码

# 启动flink cdc 发送数据到Kafka
  1. sudo flink run -m yarn-cluster \
  2. -yjm 1024 -ytm 2048 -d \
  3. -ys 4 -p 8 \
  4. -c  com.aws.analytics.MySQLCDC  \
  5. /home/hadoop/emr-flink-cdc-1.0-SNAPSHOT.jar \
  6. -b xxxxx.amazonaws.com:9092 \
  7. -t cdc_topic_001 \
  8. -c s3://xxxxx/flink/checkpoint/ \
  9. -l 30 -h xxxxx.rds.amazonaws.com:3306 -u admin \
  10. -P admin123456 \
  11. -d test_db -T test_db.* \
  12. -p 4 \
  13. -e 5400-5408
复制代码

# 相关的参数说明如下
  1. MySQLCDC 1.0
  2. Usage: MySQLCDC [options]
  3.   -c, --checkpointDir <value>
  4.                            checkpoint dir
  5.   -l, --checkpointInterval <value>
  6.                            checkpoint interval: default 60 seconds
  7.   -b, --brokerList <value>
  8.                            kafka broker list,sep comma
  9.   -t, --sinkTopic <value>  kafka topic
  10.   -h, --host <value>       mysql hostname, eg. localhost:3306
  11.   -u, --username <value>   mysql username
  12.   -P, --pwd <value>        mysql password
  13.   -d, --dbList <value>     cdc database list: db1,db2,..,dbn
  14.   -T, --tbList <value>     cdc table list: db1.*,db2.*,db3.tb*...,dbn.*
  15.   -p, --parallel <value>   cdc source parallel
  16.   -s, --position <value>   cdc start position: initial or latest,default: initial
  17.   -e, --serverId <value>   cdc server id
复制代码

# 消费Kafka topic 观察数据
  1. <div>./kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server $brok --topic cdc_topic_001 --from-beginning |jq .</div><div></div>
复制代码


2022-08-10_183828.jpg
2022-08-10_183847.jpg

3.4 Spark 消费CDC数据整库同步

# 整库同步样例代码  https://github.com/yhyyz/emr-hud ... Debezium2Hudi.scala

# 下载代码,编译打包
  1. mvn clean package  -Dscope.type=provided  -DskipTests
复制代码

# 也可以使用已经打好的包,进入EMR主节点,执行命令
  1. wget https://dxs9dnjebzm6y.cloudfront.net/tmp/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar
复制代码

# 执行如下命令提交作业,命令中设定-s hms,hudi表同步到Glue Catalog
  1. spark-submit  --master yarn \
  2. --deploy-mode client \
  3. --driver-memory 1g \
  4. --executor-memory 1g \
  5. --executor-cores 2 \
  6. --num-executors  2 \
  7. --conf "spark.dynamicAllocation.enabled=false" \
  8. --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
  9. --conf "spark.sql.hive.convertMetastoreParquet=false" \
  10. --jars  /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
  11. --class com.aws.analytics.Debezium2Hudi /home/hadoop/emr-hudi-example-1.0-SNAPSHOT-jar-with-dependencies.jar \
  12. -e prod -b xxxxx.amazonaws.com:9092 \
  13. -t cdc_topic_001 -p emr-cdc-group-02 -s true \
  14. -o earliest \
  15. -i 60 -y cow -p 10 \
  16. -c s3://xxxxx/spark-checkpoint/emr-hudi-cdc-005/ \
  17. -g s3://xxxxx/emr-hudi-cdc-005/ \
  18. -r jdbc:hive2://localhost:10000  \
  19. -n hadoop -w upsert  \
  20. -s hms \
  21. --concurrent false \
  22. -m "{"tableInfo":[{"database":"test_db","table":"user","recordKey":"id","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"},
  23. {"database":"test_db","table":"user_order","recordKey":"id","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"},{"database":"test_db","table":"product","recordKey":"pid","precombineKey":"modify_time","partitionTimeColumn":"create_time","hudiPartitionField":"year_month"}]}"
复制代码

# 相关参数说明如下:
  1. Debezium2Hudi 1.0
  2. Usage: spark ss Debezium2Hudi [options]
  3.   -e, --env <value>        env: dev or prod
  4.   -b, --brokerList <value>
  5.                            kafka broker list,sep comma
  6.   -t, --sourceTopic <value>
  7.                            kafka topic
  8.   -p, --consumeGroup <value>
  9.                            kafka consumer group
  10.   -s, --syncHive <value>   whether sync hive,default:false
  11.   -o, --startPos <value>   kafka start pos latest or earliest,default latest
  12.   -m, --tableInfoJson <value>
  13.                            table info json str
  14.   -i, --trigger <value>    default 300 second,streaming trigger interval
  15.   -c, --checkpointDir <value>
  16.                            hdfs dir which used to save checkpoint
  17.   -g, --hudiEventBasePath <value>
  18.                            hudi event table hdfs base path
  19.   -y, --tableType <value>  hudi table type MOR or COW. default COW
  20.   -t, --morCompact <value>
  21.                            mor inline compact,default:true
  22.   -m, --inlineMax <value>  inline max compact,default:20
  23.   -r, --syncJDBCUrl <value>
  24.                            hive server2 jdbc, eg. jdbc:hive2://localhost:10000
  25.   -n, --syncJDBCUsername <value>
  26.                            hive server2 jdbc username, default: hive
  27.   -p, --partitionNum <value>
  28.                            repartition num,default 16
  29.   -w, --hudiWriteOperation <value>
  30.                            hudi write operation,default insert
  31.   -u, --concurrent <value>
  32.                            write multiple hudi table concurrent,default false
  33.   -s, --syncMode <value>   sync mode,default jdbc, glue catalog set dms
  34.   -z, --syncMetastore <value>
  35.                            hive metastore uri,default thrift://localhost:9083
复制代码

# 下图可以看到表已经同步到Glue Catalog ,数据已经写入到S3
2022-08-10_183951.jpg
2022-08-10_184020.jpg

  1. -- 向MySQL的user表中添加一列,并插入一条新数据, 查询hudi表,可以看到新列和数据已经自动同步到user表,注意以下SQL在MySQL端执行
  2. alter table user add column age int
  3. insert into user(name,device_model,email,phone,age) values
  4. ('customer-03','dm-03','abc03@email.com','199776xxxxx',18);
复制代码


2022-08-10_184107.jpg

3.5 Flink Streaming Read 实时聚合

# 注意最后一个参数,-t 是把/etc/hive/conf/hive-site.xml 加入到classpath,这样hudi执行表同步到Glue是就可以加入加载到这个配置,配置中的关键是 hive.metastore.client.factory.class = com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory,这样就可以加载用到Glue的Catalog实现. 如果EMR集群启动时就选择了Glue Metastore,该文件中/etc/hive/conf/hive-site.xml 已经配置了AWSGlueDataCatalogHiveClientFactory. 如果启动EMR没有选择Glue Metastore,还需要同步数据到Glue,需要手动加上。

  1. # 注意替换为你的S3 Bucket
  2. checkpoints=s3://xxxxx/flink/checkpoints/datagen/
  3. flink-yarn-session -jm 1024 -tm 4096 -s 2  \
  4. -D state.backend=rocksdb \
  5. -D state.checkpoint-storage=filesystem \
  6. -D state.checkpoints.dir=${checkpoints} \
  7. -D execution.checkpointing.interval=5000 \
  8. -D state.checkpoints.num-retained=5 \
  9. -D execution.checkpointing.mode=EXACTLY_ONCE \
  10. -D execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION \
  11. -D state.backend.incremental=true \
  12. -D execution.checkpointing.max-concurrent-checkpoints=1 \
  13. -D rest.flamegraph.enabled=true \
  14. -d \
  15. -t /etc/hive/conf/hive-site.xml
  16. # 启动Flink sql client
  17. /usr/lib/flink/bin/sql-client.sh embedded -j /usr/lib/hudi/hudi-flink-bundle.jar shell
  18. -- user表,开启streaming read, changelog.enalbe=true
  19. set sql-client.execution.result-mode=tableau;
  20. CREATE TABLE `user`(
  21.     id string,
  22.     name STRING,
  23.     device_model STRING,
  24.     email STRING,
  25.     phone STRING,
  26.     age string,
  27.     create_time STRING,
  28.     modify_time STRING,
  29.     year_month STRING
  30. )
  31. PARTITIONED BY (`year_month`)
  32. WITH (
  33.   'connector' = 'hudi',
  34.   'path' = 's3://xxxxx/emr-hudi-cdc-005/test_db/user/',
  35.   'hoodie.datasource.write.recordkey.field' = 'id',
  36.   'table.type' = 'COPY_ON_WRITE',
  37.   'index.bootstrap.enabled' = 'true',
  38.   'read.streaming.enabled' = 'true',
  39.   'read.start-commit' = '20220607014223',
  40.   'changelog.enabled' = 'false',
  41.   'read.streaming.check-interval' = '1'
  42. );
  43. # 实时查询数据
  44. select * from `user`;
  45. # 在MySQL中修改user表中id=3的name为new-customer-03,注意以下SQL在MySQL端执行
  46. update  user set name="new-customer-03" where id=3;
复制代码

# 在Flink 端可以可以看到数据变更

2022-08-10_184143.jpg

  1. -- Flink聚合操作Sink到Hudi表
  2. -- batch
  3. CREATE TABLE  user_agg(
  4. num BIGINT,
  5. device_model STRING
  6. )WITH(
  7.   'connector' = 'hudi',
  8.   'path' = 's3://xxxxx/emr-cdc-hudi/user_agg/',
  9.   'table.type' = 'COPY_ON_WRITE',  
  10.   'write.precombine.field' = 'device_model',
  11.   'write.operation' = 'upsert',
  12.   'hoodie.datasource.write.recordkey.field' = 'device_model',
  13.   'hive_sync.database' = 'dws',
  14.   'hive_sync.enable' = 'true',
  15.   'hive_sync.table' = 'user_agg',
  16.   'hive_sync.mode' = 'HMS',
  17.   'hive_sync.use_jdbc' = 'false',
  18.   'hive_sync.username' = 'hadoop'
  19. );
复制代码


2022-08-10_184213.jpg

4. 总结

本篇文章讲解了如何通过EMR实现CDC数据入湖及Schema的自动变更。通过Flink CDC DataStream API先将整库数据发送到MSK,这时CDC在源端只有一个binlog dump线程,降低对源端的压力。使用Spark Structured Streaming 动态解析数据写入到Hudi表来实现Shema的自动变更,实现单个Job管理多表Sink, 多表情况下降低开发维护成本,可以并行或者串行写多张Hudi表,元数据同步Glue Catalog。使用Flink Hudi的Streaming Read 模式实现实时数据ETL,满足DWD和DWS层的实时Join和聚合的需求。Amazon EMR环境中原生集成Hudi, 使用Amazon EMR轻松构建了整库同步的Demo。


作者:潘超
来源:https://mp.weixin.qq.com/s/1WkzdrAH4MB5XS1Dp6FivA

最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条