分享

基于 Apache Flink Table Store 的全增量一体实时入湖

本帖最后由 fc013 于 2022-11-18 18:10 编辑


问题导读:

1、数据入湖(仓)有哪些发展阶段?
2、为什么选择 Flink Table Store?
3、怎样实现基于 Apache Flink Table Store 的全增量一体入湖?





01 数据入湖(仓)发展阶段及面临痛点

基于数据库的数据集成过程,简要来说经历了如下几个阶段。

1.1 全量 + 定期增量的数据入仓

640.png
Fig.1 数据入仓: 一次全量+周期增量

全量数据通过 bulk load 一次性导入,定时调度增量同步任务从数据库同步增量到临时表,再与全量数据进行合并。这种方式虽然能满足一定的业务需求,但是也存在以下问题。

⚠️ 链路复杂,时效性差

全量与增量需要定期的合并以获取最新的数据快照,由于不支持记录级别的更新,用户需要额外的 SQL 任务去计算去重;数据新鲜度依赖于调度,若数据存在晚到,则还需要处理数据漂移情况,一种常见处理方式是在 T + N 时刻(processing time)调度产生 T (event time)的合并任务,同时取 T ~ T + N 个分区(processing time),再从中过滤出业务时间小于等于 T (event time)的数据进行合并,这会导致数据新鲜度进一步降低。

⚠️ 明细查询慢,排查问题难

虽然下游会使用各种交互式分析引擎来加速查询,但基于成本考虑,底表明细数据一般没有这种待遇,这就导致在数据正确性排查时需要直接查询明细,特别是需要查询合并前后全量和增量的明细变化来定位问题。如果业务变更,导致一批订单数据需要订正并要求生成订正之后的各类指标,则需要手工对原始表及其下游依赖表进行级联订正。

1.2 全量 + 实时增量的数据入湖

相对于传统数据仓库,数据湖的出现使得数据在以低成本存储的同时,数据新鲜度有了极大的提升。以 Apache Hudi 为例,它支持先做一次全量 bootstrap 构建基础表,然后基于新接入的 CDC 数据进行实时构建 [1],如 Fig.2 所示。

640 (1).png
Fig.2 数据入湖: 一次全量+实时增量

由于支持记录级别的更新及删除,在存储侧就可以完成主键的去重,不再需要额外的合并任务。在数据新鲜度方面,由于流式作业会定期的触发 checkpoint 来产生全量与增量合并后的快照,故而数据新鲜度对比第一种方式(以天或小时调度产生合并快照)有了很大提升。

但从另外一方面,我们也发现这种方式有以下这些问题。

🤔 Bootstrap Index 超时及 state 膨胀

以流模式启动 Flink 增量同步作业后,系统会先将全量表导入到 Flink state 来构建 Hoodie key(即主键 + 分区路径)到写入文件的 file group 的索引,此过程会阻塞 checkpoint 完成。而只有在 checkpoint 成功后,写入的数据才可以变为可读状态,故而当全量数据很大时,有可能会出现 checkpoint 一直超时的情况,导致下游读不到数据。另外,由于索引一直保存在 state 内,在增量同步阶段遇到了 insert 类型的记录也会更新索引,需要合理评估 state TTL,配置太小可能会丢失数据,配置过大可能导致 state 膨胀。

🤔 链路依然复杂,难以对齐增量点位,自动化运维成本高

全量 + 实时增量的方式并没有简化链路的复杂度,因为它额外引入了 Kafka 的运维,需要手工对齐增量消费的点位以防止数据丢失 [2]。在启动增量 CDC 作业后,用户需要等待和观察作业的运行状态,在第一次 checkpoint 成功后,暂停作业(stop-with-savepoint)修改配置禁用 Bootstrap Index,然后从 savepoint 重启作业( restore-from-savepoint)。整个过程操作复杂,实现自动化运维成本比较高。

此外,我们回顾了一些使用 Hudi 的行业实践,发现用户需要格外注意各项配置来实现不同需求,这对易用性有一定的伤害。比如 [1] 中提到的需要在平台层面监控用户的建表语句,防止在大规模写入场景配置为 COW(Copy on Write) 模式;全增量切换时用户必须格外注意 Kafka 消费点位来保证数据准确性,参数配置极大影响了作业的数据准确性及性能。

02 基于 Apache Flink Table Store 的全增量一体入湖

随着基于日志的 CDC 逐步取代基于查询的 CDC,特别是 Flink SQL CDC 在 source 端已支持全增量一体同步后,全增量一体入湖(使用一个流作业完成全量同步、并持续监听增量 changelog)也成为一个新的探索方向。这种方式降低了链路复杂度,同时将全增量切换时需要手工对齐 offset 的繁琐托管给了 Flink CDC 和 checkpoint 机制,让框架层面去保障数据的最终一致性。但经过调研我们发现,在使用 Hudi 做这种尝试时遇到了以下挑战。

🤔 全量同步阶段数据乱序严重,写入性能和稳定性难以保障

在全量同步阶段面临的一个问题是多并发同时读取 chunk 会遇到严重的数据乱序,出现同时写多个分区的情况,大量的随机写入会导致性能回退,出现吞吐毛刺,每个分区对应的 writer 都要维护各自缓存,很容易发生 OOM 导致作业不稳定。虽然 Hudi 支持通过 Rate Limit [3] 限制每分钟的数据写入来起到一定的平滑效果,但在作业稳定性和性能吞吐之间取得平衡的调优过程对于一般用户来说门槛也较高。

2.1 为什么选择 Flink Table Store

Apache Table Store [4] 作为 2022 年初开源的 Apache Flink 子项目,目标是打造一个支持更新的据湖存储,用于实时流式 Changelog 摄取和高性能查询。

🚀 大吞吐量的更新数据摄取,支持全增量一体入湖,一个 Flink 作业搞定所有
640 (2).png

Fig.3 Flink Table Store: 全量 + 增量一体化同步

回顾前文,我们知道全增量一体化同步入湖的主要挑战在于全量同步阶段产生了大量数据乱序引起的随机写入,导致性能回退、吞吐毛刺及不稳定。而 Table Store 的存储格式采用先分区(Partition)再分桶(Bucket),每个桶内各自维护一棵 LSM(Log-structured Merge Tree)的方式(参见 Fig.4、Fig.5),每条记录通过主键哈希落入桶内时就确定了写入路径(Directory),以 KV 方式写入 MemTable 中(类似于 HashMap,Key 就是主键,Value 是记录)。在 flush 到磁盘的过程中,以主键排序合并(去重),以追加方式写入磁盘。Sort Merge 在 buffer 内进行,避免了需要点查索引来判断一条记录是 insert 还是 update 来获取写入文件的 file group 的 tagging [5] 。另外,触发 MemTable flush 发生在 buffer 充满时,不需要额外通过 Auto-File Sizing [6](Auto-File Sizing 会影响写入速度 [7])来避免小文件产生,整个写入过程都是局部且顺序的 [8],避免了随机 IO 产生。

使用 Table Store 作为湖存储时,只需要一条 INSERT INTO 语句就可以完成全增量同步。以如下 SQL 为例,它展示了使用一个 Flink 流作业将 MySQL 数据库中的订单表通过 Streaming 方式写入 Table Store 表,并持续消费增量数据。

  1.     -- 创建并使用 table store catalog
  2.     CREATE CATALOG `table_store` WITH (
  3.         'type' = 'table-store',
  4.         'warehouse' = 'hdfs://foo/bar'
  5.     );
  6.     USE CATALOG `table_store`;
  7.     -- 定义 mysql-cdc source 表
  8.     CREATE TEMPORARY TABLE `orders_cdc` (
  9.       order_id BIGINT NOT NULL,
  10.       gmt_modified TIMESTAMP(3) NOT NULL,
  11.       ...
  12.       PRIMARY KEY (`order_id`) NOT ENFORCED
  13.     ) WITH (
  14.       'connector' = 'mysql-cdc',
  15.        ...
  16.     );
  17.     -- 定义 table store ods 表,按日期作分区
  18.     CREATE TABLE IF NOT EXISTS `orders` (
  19.       ...
  20.       PRIMARY KEY (`dt`, `order_id`) NOT ENFORCED
  21.     ) PARTITIONED BY (`dt`);
  22.     -- streaming 模式下提交作业
  23.     SET 'execution.runtime-mode' = 'streaming';
  24.     -- 设置 1min 的 cp interval,对应 1min 的数据新鲜度
  25.     SET 'execution.checkpointing.interval' = '1min';
  26.     -- 一条 SQL 同步全量 + 增量,动态分区写入
  27.     INSERT INTO `orders`
  28.     SELECT ..., DATE_FORMAT(gmt_modified, 'yyyy-MM-dd') AS dt
  29.     FROM `orders_cdc`;
复制代码

Fig.4 展示了以 dt 作为分区 orders 表的存储结构,在用户指定总 bucket 数 N 后,每个分区下会生成相应的 bucket-${n} 目录,每个目录下以列存格式(orc 或 parquet)存放 hash_func(pk) % N == ${n} 的记录文件。

640 (3).png
Fig.4 Flink Table Store 表的文件目录

元数据与数据存储在表的同一级目录下,包括 manifest 目录和 snapshot 目录。

manifest 目录下中记录了每次经 checkpoint 触发而提交的数据文件变更,包含新增和删除的数据文件

snapshot 目录下记录了每次提交产生的 snapshot 文件,内容包括为上一次提交产生的 manifest,加上本次提交产生的 manifest 作为增量

Fig.5 展示了 Fig.4 中每个 bucket 内 LSM 实现过程。我们以 Flink 流作业为例,在每次 checkpoint 时,Flink Table Store 会产生一次提交(commit),包含以下信息

生成当前 table 的一个快照(snapshot)。系统会通过 snapshot pointer file(类似于指针)追踪最早产生和当前最新的 snapshot 文件

snapshot 文件中包含了本次 commit 新增了哪些 manifest 文件、删除了哪些 manifest 文件

每个 manifest 文件中记录了产生了哪些 sst 文件、删除了哪些 sst 文件,以及每个 sst 文件所包含记录的主键范围、每个字段的 min/max/null count 等统计信息

每个 sst 文件则包含了按主键排好序的、列存格式的记录。对于 Level 0 的文件,Table Store 会异步地触发 compact 合并线程来消除主键范围重叠带来的读端 merge 开销

640 (4).png
Fig.5 Flink Table Store 表的 LSM 实现

在 Flink Table Store 0.2.0 发布时,我们测试了五亿条数据在一亿主键下的实时更新场景写入(包含插入与更新)并与 Apache Hudi MOR 及 COW 进行对比 [9], Table Store 在大规模实时更新写入场景拥有良好的写入性能。

🚀 高效 Data Skipping 支持过滤,提供高性能的点查和范围查询

虽然没有额外的索引,但是得益于 meta 的管理和列存格式,manifest 中保存了

文件的主键的 min/max 及每个字段的统计信息,这可以在不访问文件的情况下,进行一些 predicate 的过滤

orc/parquet 格式中,文件的尾部记录了稀疏索引,每个 chunk 的统计信息和 offset,这可以通过文件的尾部信息,进行一些 predicate 的过滤

数据在有 filter 读取时,可以根据上述信息做如下过滤

读取 manifest:根据文件的 min/max、分区,执行分区和字段的 predicate,淘汰多余的文件

读取文件 footer:根据 chunk 的 min/max,过滤不需要读取的 chunk

读取剩下与文件以及其中的 chunks

以上述订单表 orders 为例,当用户想要查询 dt = 2022-01-01 分区下所有 order_id 在 100 ~ 200 之间的订单时

SELECT * FROM orders WHERE dt = '2022-01-01' AND order_id >= 100 AND order_id <= 200;

Flink Table Store 会先根据 LATEST-SNAPSHOT 文件读到最近一次提交的 snapshot 文件(read committed),然后从 snapshot 中读取到对应 manifest meta 文件, 根据分区条件 dt='2022-01-01',过滤出包含这些分区的统计信息,由于统计信息里包含了每个 sst 文件 key 的范围,所以继续执行 order_id 在 [100, 200] 区间这个过滤条件,就能在 2022-01-01 这个目录下只读取对应的 sst 文件。

我们同样基于上述数据集测试了 Flink Table Store 的查询性能[9],在点查和范围查询的场景下,Flink Table Store 表现出众。从实现原理来说,MOR 的查询性能低于 COW、COW 的写入性能低于 MOR 是难以避免的。而在实践层面,在大规模写入场景下建立的 MOR 表也很难一键转换为 COW 来读取,所以在查询写入较多的表(MOR 表)这个前提下,Flink Table Store 的查询表现还是不俗的。

&#128640; 文件格式支持流读

Flink Table Store 实现了 Incremental Scan,在流模式下,可以持续监听文件更新,数据新鲜度保持在分钟级别,如下所示。

  1. -- 进入 SQL CLI,创建 catalog 和 table
  2. CREATE CATALOG table_store WITH (
  3.   'type' = 'table-store',
  4.   'warehouse' = 'file://foo/bar/' --或 'hdfs://foo/bar'
  5. );
  6. CREATE TABLE IF NOT EXIST my_table (
  7.   f0 INT,
  8.   f1 STRING,
  9.   PRIMARY KEY(f0) NOT ENFORCED
  10. );
  11. -- 切换到 batch 模式,写入数据
  12. SET 'execution.runtime-mode' = 'batch';
  13. INSERT INTO my_table VALUES(1, 'Hello');
  14. -- 新打开一个 SQL CLI 中,切换到 streaming 模式,提交流式查询
  15. SET 'execution.runtime-mode' = 'streaming';
  16. SET 'sql-client.execution.result-mode' = 'tableau';
  17. SELECT * FROM my_table;
  18. -- 可以读到结果如下
  19. +----+-------------+--------------------------------+
  20. | op |          f0 |                             f1 |
  21. +----+-------------+--------------------------------+
  22. | +I |           1 |                          Hello |
  23. -- 在第一个 SQL CLI 中,继续写入数据
  24. INSERT INTO my_table VALUES(1, 'Bye'), (2, '你好');
  25. -- 可以在第二个 SQL CLI 中,观察到新增输出 (-U, 1, Hello),(+U, 1, Bye)  和 (+I, 2, 你好)
  26. +----+-------------+--------------------------------+
  27. | op |          f0 |                             f1 |
  28. +----+-------------+--------------------------------+
  29. | +I |           1 |                          Hello |
  30. | -U |           1 |                          Hello |
  31. | +U |           1 |                            Bye |
  32. | +I |           2 |                           你好 |
复制代码

2.2 基于 TPC-H 数据集的全增量一体入湖 Demo

前文对 Flink Table Store 解决全增量一体入湖进行了简要分析,下面一个实例演示了如何在本地单机环境下,将近六千万条订单记录作为全量数据从 MySQL 同步到 Flink Table Store,并持续消费增量更新(由 TPC-H RF1 和 RF2 产生),下游接实时聚合及查询的过程。

640 (5).png

数据源由 TPC-H 自动生成并导入 MySQL ,运行在 Docker 容器内,本地只需要下载 Flink release 包和 Flink Table Store 依赖即可完成。

Demo 使用 lineitem 表中发货日期 l_shipdate 作为业务字段定义了二级分区 l_year 和 l_month,时间跨度从 1992.1 ~ 1998.12,即动态写入 84 个分区。 经测试,在本地单机并发为 2,checkpoint interval 为 1 min 的配置下(每分钟更新可见)46 min 内写入 59.9 million 全量数据,每 10 min 的写入性能如下表所示,平均写入性能在 1.3 million/min。

640 (6).png

详细配置如下所示

640 (7).png

详细步骤可查看 Apache Flink Table Store 全增量一体 CDC 实时入湖(https://github.com/LadyForest/fl ... update/README.zh.md

03 总结与展望

本文简要回顾了数据入湖(仓)的发展阶段,针对在数据库数据入湖中面临的问题,我们提出了使用 Flink Table Store 作为全增量一体入湖的解决方案,并辅以开源 Demo 的测试结果作为展示。







最新经典文章,欢迎关注公众号



---------------------

作者:Apache Flink
来源:weixin
原文:基于 Apache Flink Table Store 的全增量一体实时入湖



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条