分享

B站基于Hudi+Flink打造流式数据湖的落地实践

问题导读:
1. B站数仓的现状和痛点是什么?
2. 针对典型的四个案例,对应的优化方案和收益是什么?
3. 介绍在批流融合方面,对数据湖基建和内核的针对性优化工作有哪些?
4. 未来的工作有哪些方向?




01 背景与挑战

2023-08-28_183641.jpg

上图展示了当前B站实时数仓的一个简略架构,大致可以分为采集传输层、数据处理层,以及最终的AI和BI应用层。为保证稳定性,数据处理层是由以实时为主,以离线兜底的两条链路组成,即我们熟知的批流双链路。

在实践落地的过程中,上述上架构存在以下问题:

    首先,从架构视角,批流双链路对应不同的存储和计算组件,维护和资源成本高;

    其次,从用户视角,实时链路观测性较差,离线链路时效性不足;

    第三,数据孤岛,数据应用层一般都要基于多种仓外组件流转,数据管理上存在断层;

    最后,查询效率低,若不依赖OLAP组件,就无法满足业务方高效的数据分析需求。

2023-08-28_184824.jpg


为了解决上述困境,我们引入了数据湖的构建。如上图,是我们构建数据湖的能力愿景,也是落地的实践路径。

首先,支持高效的数据流转,比如实时数据入湖,流量日志动态分流,以及数据模型层的湖上流式构建能力,如Join、维表等。

其次,批流融合能力,在底层内核、架构、平台工具等打通批流一体生产,支持流-批调度,多任务并发更新等。

第三,统一的数据管理,包括统一元数据服务、强大的数据湖自治,表服务自适应管理,湖上视图管理等。

最后,便捷的分析查询,各种湖查询加速能力建设,包括Clustering加速、索引加速、预计算物化加速,Alluxio缓存加速等。


02典型场景案例

下面会针对四个典型案例进行展开:RDB一键入湖、流量日志分流、物化查询加速,以及实时数仓演进。

1. RDB一键入湖
2023-08-28_184906.jpg


为了提升数据集成的时效性,我们将原基于Datax+Hive的方案,成功改造为CDC+Hudi的方案。上图是一个简略的CDC+Hudi同步方案示意。RDB先通过Flink CDC Job同步至内部Kafka缓冲供下游订阅,然后1个逻辑表会对应1个Hudi Sync任务,再同步至Hudi表。

在具体落地过程中,我们解决了乱序、Schema Evolution、数据断流推进等问题,本文在此不做展开,将重点讨论批流融合的痛点。

2023-08-28_184940.jpg

以往基于批同步后,业务方将获得一个全量或者增量数据分片,即数仓里的一个分区。如上图右侧示例,SQL只需写log_date进行过滤就可指定对应分片。

升级至实时入湖方案之后,在切换过程中会有以下两个痛点:

一是分片的时间界限模糊导致切换有感,需用户主动过滤漂移数据,比如基于event time,且SQL上的过滤只能下推至Merge后数据,对CDC Merge前的变更流不生效;

二是由于数据实时变更,历史分区会随时被Upsert,流转批后的离线ETL任务无法获得稳定重跑链路。

对以上问题业界有些潜在方案,一种是通过脚本,从Hudi表导出到Hive表来实现快照,但会导致使用割裂和架构冗余;另一种是基于Savepoint的方案,在Commit时会触发Savepoint,但并未解决漂移问题。

2023-08-28_185009.jpg

我们的优化方案是基于Hudi Snapshot View快照视图,并支持在多种引擎上的适配。

如上图所示意,基于Hudi支持了带过滤谓词下推的分区快照视图,以实现具备准确切分的逻辑分区。视图实时创建之后,会rewrite生成一个Predicated File Slice,将Expression下推至Log Merge之前,对漂移数据准确过滤。

数据文件是基于Hudi Meta进行映射的,没有冗余的存储。快照视图上也支持独立的Compaction/Clustering/Clean等表服务,对视图物化、加速或过期等。

2023-08-28_185034.jpg

在该方案里,一张表里同时存在实时分区、增量快照分区以及全量快照分区,该如何进行管理?

如前文所述,快照视图也会有表服务,所以直接新增一个Action,无法满足需求。我们引入了一种新的视图管理手段,该方案是基于独立的Timeline来实现。如上图右下方所示,新增的Snapshot Timeline实现了类似于Git的能力,比如,支持branch或者tag的创建或者删除,快照切换等。

在分区视图场景中,通过轻量的checkout操作,就能够实现实时、全量以及增量分区的便捷切换,视图的Compaction/Clustering/Clean等表服务,也在各自Timeline上独立管理。

2023-08-28_185102.jpg


在写入和查询阶段,如何对引擎进行适配?

写入侧,比较重要的是Snapshot View的生成时机。我们基于分区提交来确认数据到位,同时触发快照生成。而分区提交的时机,则是基于Watermark的分区推进机制,这块在下文内核优化部分再展开介绍。

查询侧,目前已支持Flink Batch 、Spark和Hive引擎对快照视图查询,用户在原有SQL基础上,只需加上hint声明查询模式是增量或全量的,即可访问对应的分区视图。具体实现,是在Hudi里面新增了SnapshotMetaClient,用来指向TableMetaClient,基于前面实现的Timeline管理机制,动态指向到对应分区视图。

最终的收益主要是降本增效。降本方面,相当于一张Hudi表里,每个分区只存有增量的数据,但同时实现一个全量分区、增量分区以及实时分区,大幅降低了存储成本。增效方面,数据时效提升到分钟级,且hint或option的机制,使用户基本没有切换成本。

2. 流量日志分流

2023-08-28_185132.jpg

流量日志分流是一个常见业务场景。我们公司内部有日千亿级的埋点日志,包含1w+事件分类,产出供全站各个BU使用。如上图所示,之前已有基于Flink+Hive的生成链路,其主要痛点如下:

    首先,时效性不足,小时级产出数据,无法满足下游分钟级时效诉求。

    其次,稳定性不足,从传输层到ODS层,最后分流到DWD层,仅靠一条流产出。里面包含主站、直播、游戏等各个BU的数据,业务隔离性较差。

    第三,基于业务类型的分流方式不够灵活,比如HDFS分流,主要产出是物理分区,除归档日期外,还可能带有业务类型(比如event id)分流,导致下游通知极其复杂。由于事件类型过多,只能按照事件组分区,下游使用时仍需主动过滤无用数据,有大量重复IO。另外,由于各BU数据在下游使用时会交叉订阅,固定的分区也会导致混乱的数据权限管理。

2023-08-28_185207.jpg

我们的解决方案,如上图所示,主要包含三个方面:

    首先,Hudi Append替换Hive,使下游天然支持基于Hudi的增量消费,达到分钟级数据时效。

    其次,传输层的分流优化,从平台边缘开始,按照BU进行动态规则分流,以单流单job传输到ODS层,增强隔离性和稳定性。

    最后,仓内的分流优化,从传统的物理分区分流,改为逻辑分区分流。具体做法是,除保留归档日期作为物理分区外,原业务分区字段退化为普通字段。DWD Hudi后新增一层View,定义下游订阅逻辑,以解决数据权限管理问题。View中按原业务分区字段过滤相当于逻辑分区过滤,基于Hudi外挂的Clustering Job对其进行排序重分布,通过Hudi Dataskip加速。

2023-08-28_185235.jpg

如上图,是新老方案的查询性能对比,在大部分场景下,新方案表现更优,整体方案达到预期要求,同时也简化了整个流量日志的分流架构。

3. 物化查询加速

2023-08-28_185302.jpg

通常,在数据生产的末端进行查询时,面临如下痛点:

    数据在ADS层需出仓到Mysql或者ClickHouse,面临出仓后管理断层的问题。

    末端查询分析时,一般是聚合查询,有严重的IO放大。

    运维和开发成本比较高。比如,要新增一个新维度下的实时聚合指标,业务端需要对应新增一个实时任务。若生产任务出现异常,对应指标将不可用,稳定性较差。

针对上述痛点,我们通过Flink物化视图支持与Hudi增量计算,实现了指标预计算。

如上图,用户可以通过hint标记子查询或主动创建物化视图,在后台构建起托管的指标物化任务。它增量消费Hudi源表,将物化结果写入Hudi Upsert表。查询时,如果被Flink BatchPlanner命中,将直接查询物化表,提升了查询时效性。此外,该方案对稳定性也有极大提升,若物化任务异常,可降级到源表查询。

2023-08-28_185330.jpg

如上图,此处简要介绍下实现:

首先,对Flink支持了物化视图,并在BatchPlanner里,新增了物化解析规则和管理。

其次,Hudi表TableMeta新增物化路由的索引,并在写入端,支持commit时记录watermark在InstantMeta中,作为进度暴露给查询端。

2023-08-28_185402.jpg

在Hudi支持Flink Batch在OLAP场景中的查询响应上,我们也做了很多优化。比如组件缓存,通过metaclient、文件索引等复用,减少了元数据加载耗时。通过线程池并行加载、文件索引异步预加载、list合并、本地性优化等手段,实现了对Split的生成加速。基于文件索引,可对查询的并行度动态推算等。

对源表,我们已支持了Clustering和索引加速。对物化Upsert表,也支持了对历史数据Clustering。此外,基于Alluxio,可同时对物化表和源表进行缓存加速。

4. 实时数仓演进
2023-08-28_185433.jpg

下面介绍下实时数仓演进,上图是开篇提到的实时数仓架构。前文介绍的3个典型案例方案,重点关注了其中的RDB实时入湖、流量日志动态分流、指标查询加速等痛点,我们也尝试寻找其他痛点场景的解决方案。

2023-08-28_185501.jpg

如上图,融合了新方案后的架构,是个混合增量数仓,在探索的新场景主要包括三个方面:

    首先,Hudi替换原Kafka的MQ场景,在计算口径上和流批存储上统一,以实现降本。

    其次,实现近实时DQC,以前实时链路需要dump到HDFS上才可观测,替换Hudi表后可统一实时离线DQC。

    第三,Hudi替换Mysql,实现数据不出仓BI直连。基于前文的指标查询加速,分析和报表两种场景将基于同一张Hudi表,同一套Flink SQL。

03基建与内核


接下来介绍在批流融合方面对基建和内核的优化,包括:TableService的优化、分区推进支持,以及数据回滚的增强。

1. TableService优化

2023-08-28_185534.jpg

如上图,是一个含内嵌表服务的Hudi写入作业,该架构有以下痛点:

    首先,稳定性比较差,资源利用率比较低,因为compaction/clustering等表服务与流式写入相比,其批流特征有明显区别,会互相干扰。

    其次,策略灵活度差,在调整完表服务策略之后,需要对写入作业重启才可生效。

    第三,缺乏平台化托管能力,用户需自行配置,门槛较高。

2023-08-28_185600.jpg

我们的优化方案是通过表服务的外挂模式,加上Hudi Manager进行托管,上图是整体架构:

首先,对所有表服务都支持调度-执行剥离,并且支持动态规则刷新;

其次,引入Hudi Manager作为中控平台,支持Hudi表生命周期管理、表服务的策略化运营、资源与任务调度等。

目前支持了社区的所有表服务,以及自研的物化表服务。

2. 分区推进支持
2023-08-28_185628.jpg

当前社区的Hive Sync,主要聚焦于分区同步,而非分区推进。而在批流融合过程中,尤其是流转批时,下游调度通知尤为重要。此外,分区推进问题,也关系到如何在同一张表中,协同好用户实时分析和调度ETL两种场景。

我们的方案是基于Watermark的分区推进机制。

首先,分区推进会被分成两步提交,第一步是arrival commit,在数据第一次写入该分区时commit,第二步是ready commit,当watermark到达了预设值之后,再次进行commit。

其次,对Hive MetaStore拓展,在Partiiton中新增commit属性,arrival和ready两次commit分别对应false和true,以此标记分区是否完整提交。

为避免因任务重启等问题导致错乱,分区推进状态会以PartitionState形式存在Flink State中。每次提交的分区,将根据write status、watermark和状态来生成,确保其一致性。

2023-08-28_185655.jpg

下面介绍下查询端对分区推进机制的适配。

若在调度ETL场景,默认情况下,用户可查到所有ready commit分区下面的数据文件。

若用户需要实时分析,只要指定include commit=true,就可查到所有当前arrival commit的分区,即所有数据。

两者在SQL层面没有任何区别,只需要设置参数或者hint即可。

3. 数据回滚增强

2023-08-28_185723.jpg

回滚能力对于数据湖的生产落地保障非常重要,可以大致分成两部分,一个是业务数据回滚,另一个是元数据异常运维。

在业务数据回滚方面,以前基于Flink流式写入,都会采用Spark批量修复,流批SQL的不统一,无法做到真正的批流融合。另外,基于Kafka的实时链路,基本上不具备修复能力。

基于Hudi+Flink的方案后,我们做了以下的工作:

     首先,增强Hudi回滚能力,引入了基于文件锁的并发更新机制。

    其次,以Flink Batch替代了Spark。

    最后,把回滚方案集成到平台,支持用户一键重跑。

2023-08-28_185747.jpg


Hudi元数据的修复,可能会由多种原因引起。比如,因为一些未知问题,导致了从某时刻开始出现元数据状态跟数据文件不一致。又或者,若HDFS出现了坏块,导致了archived timeline、某些instant、某些数据文件损坏,若不运维就会扩大影响至整张Hudi表不可用。

我们的方案,以Instant Rollback为主要手段,以Savepoint Rollback作为兜底,通过Spark Procedure来接入。

为什么是需要两种rollback来结合呢?如上图,列举两种case来说明。

如果archived timeline文件损坏,只单纯地rollback到某个instant将无法修复,因为查询archived timeline仍不可避免,此时可rollback至最近可用的savepoint,再重新恢复写入作业。

如果仅某个instant损坏,则rollback到最近可用的instant即可。若可用instant都已被archived,则rollback至最近可用的savepoint。然后,再重新恢复写入作业。

对于savepoint,将作为一个托管的表服务,基于前文提到Hudi Manager周期性生成和过期,以确保一直存在可用版本。

04未来工作展望

最后,我简略介绍一下对未来工作的展望。

首先,数据湖内核能力增强,包括数据流转能力拓展(如维表)、无锁并发更新、查询加速等。

其次,进一步完善数据湖基建,统一Metastore,Hudi Manager提升管理与运营能力等。

第三,流批一体场景的落地,包括搜推广场景推进,湖上数据模型构建场景渗透等。

最后,在平台化方面,支持流批服务的打通,持续提升用户体验。


作者:陈世治
来源:https://mp.weixin.qq.com/s/mR7tqStpPu5TtWzVRq9iKg

最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条