分享

Apache Flink CDC 批流融合技术原理分析


问题导读:

1、怎样使用Flink CDC实现全量读取 + 增量读取 Mysql 表数据?
2、怎样使用Flink CDC实现切片划分、切分读取?
3、怎样调用flink-mysql-cdc 接口?




8 月份 Flink CDC 发布 2.0.0 版本,相较于 1.0 版本,在全量读取阶段支持分布式读取、支持 checkpoint,且在全量 + 增量读取的过程在不锁表的情况下保障数据一致性。 详细介绍参考 https://flink-learning.org.cn/ar ... 1c4d5eeb75a141d9e1e

Flink CDC 2.0 数据读取逻辑并不复杂,复杂的是 https://cwiki.apache.org/conflue ... or+Source+Interface 的设计及对 Debezium Api 的不了解。本文重点对 Flink CDC 的处理逻辑进行介绍, https://cwiki.apache.org/conflue ... or+Source+Interface 的设计及 Debezium 的 API 调用不做过多讲解。

本文使用 CDC 2.0.0 版本,先以 Flink SQL 案例来介绍 Flink CDC 2.0 的使用,接着介绍 CDC 中的核心设计包含切片划分、切分读取、增量读取,最后对数据处理过程中涉及 flink-mysql-cdc 接口的调用及实现进行代码讲解。

一、案例
全量读取 + 增量读取 Mysql 表数据,以changelog-json 格式写入 kafka,观察 RowKind 类型及影响的数据条数。

  1. public static void main(String[] args) {
  2.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3.         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
  4.                 .useBlinkPlanner()
  5.                 .inStreamingMode()
  6.                 .build();
  7.         env.setParallelism(3);
  8.         // note: 增量同步需要开启CK
  9.         env.enableCheckpointing(10000);
  10.         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
  11.             
  12.         tableEnvironment.executeSql(" CREATE TABLE demoOrders (\n" +
  13.                 "         `order_id` INTEGER ,\n" +
  14.                 "          `order_date` DATE ,\n" +
  15.                 "          `order_time` TIMESTAMP(3),\n" +
  16.                 "          `quantity` INT ,\n" +
  17.                 "          `product_id` INT ,\n" +
  18.                 "          `purchaser` STRING,\n" +
  19.                 "           primary key(order_id)  NOT ENFORCED" +
  20.                 "         ) WITH (\n" +
  21.                 "          'connector' = 'mysql-cdc',\n" +
  22.                 "          'hostname' = 'localhost',\n" +
  23.                 "          'port' = '3306',\n" +
  24.                 "          'username' = 'cdc',\n" +
  25.                 "          'password' = '123456',\n" +
  26.                 "          'database-name' = 'test',\n" +
  27.                 "          'table-name' = 'demo_orders'," +
  28.                             //  全量 + 增量同步   
  29.                 "          'scan.startup.mode' = 'initial'      " +
  30.                 " )");
  31.               tableEnvironment.executeSql("CREATE TABLE sink (\n" +
  32.                 "         `order_id` INTEGER ,\n" +
  33.                 "          `order_date` DATE ,\n" +
  34.                 "          `order_time` TIMESTAMP(3),\n" +
  35.                 "          `quantity` INT ,\n" +
  36.                 "          `product_id` INT ,\n" +
  37.                 "          `purchaser` STRING,\n" +
  38.                 "          primary key (order_id)  NOT ENFORCED " +
  39.                 ") WITH (\n" +
  40.                 "    'connector' = 'kafka',\n" +
  41.                 "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
  42.                 "    'topic' = 'mqTest02',\n" +
  43.                 "    'format' = 'changelog-json' "+
  44.                 ")");
  45.              tableEnvironment.executeSql("insert into sink select * from demoOrders");}
复制代码

全量数据输出:

  1. {"data":{"order_id":1010,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:12.189","quantity":53,"product_id":502,"purchaser":"flink"},"op":"+I"}
  2. {"data":{"order_id":1009,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:09.709","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
  3. {"data":{"order_id":1008,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:06.637","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  4. {"data":{"order_id":1007,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:03.535","quantity":52,"product_id":502,"purchaser":"flink"},"op":"+I"}
  5. {"data":{"order_id":1002,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:51.347","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  6. {"data":{"order_id":1001,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:48.783","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
  7. {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 17:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
  8. {"data":{"order_id":1006,"order_date":"2021-09-17","order_time":"2021-09-22 10:52:01.249","quantity":31,"product_id":500,"purchaser":"flink"},"op":"+I"}
  9. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"+I"}
  10. {"data":{"order_id":1004,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:56.153","quantity":50,"product_id":502,"purchaser":"flink"},"op":"+I"}
  11. {"data":{"order_id":1003,"order_date":"2021-09-17","order_time":"2021-09-22 10:51:53.727","quantity":30,"product_id":500,"purchaser":"flink"},"op":"+I"}
复制代码

修改表数据,增量捕获:

  1. ## 更新 1005 的值
  2. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:51:58.813","quantity":69,"product_id":503,"purchaser":"flink"},"op":"-U"}
  3. {"data":{"order_id":1005,"order_date":"2021-09-17","order_time":"2021-09-22 02:55:43.627","quantity":80,"product_id":503,"purchaser":"flink"},"op":"+U"}
  4. ## 删除 1000
  5. {"data":{"order_id":1000,"order_date":"2021-09-17","order_time":"2021-09-17 09:40:32.354","quantity":30,"product_id":500,"purchaser":"flink"},"op":"-D"}
复制代码

二、核心设计

1. 切片划分

全量阶段数据读取方式为分布式读取,会先对当前表数据按主键划分成多个Chunk,后续子任务读取Chunk 区间内的数据。根据主键列是否为自增整数类型,对表数据划分为均匀分布的Chunk及非均匀分布的Chunk。

1.1 均匀分布

主键列自增且类型为整数类型(int,bigint,decimal)。查询出主键列的最小值,最大值,按 chunkSize 大小将数据均匀划分,因为主键为整数类型,根据当前chunk 起始位置、chunkSize 大小,直接计算 chunk 的结束位置。

注意:最新版本均匀分布的触发条件不再依赖主键列是否自增,要求主键列卫整数类型且根据 max(id) - min(id)/rowcount 计算出数据分布系数,只有分布系数 <= 配置的分布系数 (evenly-distribution.factor 默认为 1000.0d) 才会进行数据均匀划分。

  1. //  计算主键列数据区间
  2. select min(`order_id`), max(`order_id`) from demo_orders;
  3. //  将数据划分为 chunkSize 大小的切片
  4. chunk-0: [min,start + chunkSize)
  5. chunk-1: [start + chunkSize, start + 2chunkSize)
  6. .......
  7. chunk-last: [max,null)
复制代码

1.2 非均匀分布

主键列非自增或者类型为非整数类型。主键为非数值类型,每次划分需要对未划分的数据按主键进行升序排列,取出前 chunkSize 的最大值为当前 chunk 的结束位置。

注意:最新版本非均匀分布触发条件为主键列为非整数类型,或者计算出的分布系数 (distributionFactor) > 配置的分布系数 (evenly-distribution.factor)。

  1. // 未拆分的数据排序后,取 chunkSize 条数据取最大值,作为切片的终止位置。
  2. chunkend = SELECT MAX(`order_id`) FROM (
  3.         SELECT `order_id`  FROM `demo_orders`
  4.         WHERE `order_id` >= [前一个切片的起始位置]
  5.         ORDER BY `order_id` ASC
  6.         LIMIT   [chunkSize]  
  7.     ) AS T
复制代码

2. 全量切片数据读取

Flink 将表数据划分为多个 Chunk,子任务在不加锁的情况下,并行读取 Chunk 数据。因为全程无锁在数据分片读取过程中,可能有其他事务对切片范围内的数据进行修改,此时无法保证数据一致性。因此,在全量阶段 Flink 使用快照记录读取 + Binlog 数据修正的方式来保证数据的一致性。

2.1 快照读取

通过 JDBC 执行 SQL 查询切片范围的数据记录。

  1. ## 快照记录数据读取SQL
  2. SELECT * FROM `test`.`demo_orders`
  3. WHERE order_id >= [chunkStart]
  4. AND NOT (order_id = [chunkEnd])
  5. AND order_id <= [chunkEnd]
复制代码

2.2 数据修正

在快照读取操作前、后执行 SHOW MASTER STATUS 查询 binlog 文件的当前偏移量,在快照读取完毕后,查询区间内的 binlog 数据并对读取的快照记录进行修正。

快照读取 + Binlog 数据读取时的数据组织结构:

O1CN01mUtwju1p6UDuxMy1C_!!6000000005311-2-tps-1080-50.png

BinlogEvents 修正 SnapshotEvents 规则。

  • 未读取到 binlog 数据,即在执行 select 阶段没有其他事务进行操作,直接下发所有快照记录。
  • 读取到 binlog 数据,且变更的数据记录不属于当前切片,下发快照记录。
  • 读取到 binlog 数据,且数据记录的变更属于当前切片。delete 操作从快照内存中移除该数据,insert 操作向快照内存添加新的数据,update 操作向快照内存中添加变更记录,最终会输出更新前后的两条记录到下游。

修正后的数据组织结构:

O1CN01POsozI1WMXPFo2J0W_!!6000000002774-2-tps-1080-93.png

以读取切片 [1,11] 范围的数据为例,描述切片数据的处理过程。c、d、u 代表 Debezium 捕获到的新增、删除、更新操作。

修正前数据及结构:

O1CN01K13WwI1Ml8WLD8TOL_!!6000000001474-2-tps-1080-48.png

修正后数据及结构:

O1CN015p15ai1a4SE7uq2u1_!!6000000003276-2-tps-1080-54.png

单个切片数据处理完毕后会向 SplitEnumerator 发送已完成切片数据的起始位置(ChunkStart, ChunkStartEnd)、Binlog 的最大偏移量(High watermark),用来为增量读取指定起始偏移量。

3. 增量切片数据读取

全量阶段切片数据读取完成后,SplitEnumerator 会下发一个 BinlogSplit 进行增量数据读取。BinlogSplit 读取最重要的属性就是起始偏移量,偏移量如果设置过小下游可能会有重复数据,偏移量如果设置过大下游可能是已超期的脏数据。而 Flink CDC 增量读取的起始偏移量为所有已完成的全量切片最小的Binlog 偏移量,只有满足条件的数据才被下发到下游。数据下发条件:

  • 捕获的 Binlog 数据的偏移量 > 数据所属分片的 Binlog 的最大偏移量。


例如,SplitEnumerator 保留的已完成切片信息为:

切片索引 Chunk 数据范围 切片读取的最大Binlog
0[1,100] 1000
1[101,200] 800
2[201,300] 1500

增量读取时,从偏移量 800 开始读取 Binlog 数据 ,当捕获到数据 <data:123, offset:1500> 时,先找到 123 所属快照分片,并找到对应的最大 Binlog 偏移量 800。 当前偏移量大于快照读的最大偏移量,则下发数据,否则直接丢弃。

三、代码详解

关于 https://cwiki.apache.org/conflue ... or+Source+Interface 设计不做详细介绍,本文侧重对 flink-mysql-cdc 接口调用及实现进行讲解。

1. MySqlSourceEnumerator 初始化

SourceCoordinator 作为 OperatorCoordinator 对 Source 的实现,运行在 Master 节点,在启动时通过调用 MySqlParallelSource#createEnumerator 创建 MySqlSourceEnumerator 并调用 start 方法,做一些初始化工作。

O1CN01qYATvG23Dj6VqaR77_!!6000000007222-2-tps-1080-634.png

(1)创建 MySqlSourceEnumerator,使用 MySqlHybridSplitAssigner 对全量+增量数据进行切片,使用 MySqlValidator 对 mysql 版本、配置进行校验。

(2)MySqlValidator 校验:

  • mysql 版本必须大于等于 5.7。
  • binlog_format 配置必须为 ROW。
  • binlog_row_image 配置必须为 FULL。

(3)MySqlSplitAssigner 初始化:

  • 创建 ChunkSplitter 用来划分切片。
  • 筛选出要读的表名称。


启动周期调度线程,要求 SourceReader 向 SourceEnumerator 发送已完成但未发送 ACK 事件的切片信息。

  1. private void syncWithReaders(int[] subtaskIds, Throwable t) {
  2. if (t != null) {
  3.      throw new FlinkRuntimeException("Failed to list obtain registered readers due to:", t);
  4. }
  5. // when the SourceEnumerator restores or the communication failed between
  6. // SourceEnumerator and SourceReader, it may missed some notification event.
  7. // tell all SourceReader(s) to report there finished but unacked splits.
  8. if (splitAssigner.waitingForFinishedSplits()) {
  9.      for (int subtaskId : subtaskIds) {
  10.          // note: 发送 FinishedSnapshotSplitsRequestEvent
  11.          context.sendEventToSourceReader(
  12.                  subtaskId, new FinishedSnapshotSplitsRequestEvent());
  13.      }
  14. }
  15. }
复制代码

2. MySqlSourceReader 初始化

SourceOperator 集成了 SourceReader,通过OperatorEventGateway 和 SourceCoordinator 进行交互。

O1CN01PLZqhv1pfIK0XeC4A_!!6000000005387-2-tps-1080-666.png

SourceOperator 在初始化时,通过 MySqlParallelSource 创建 MySqlSourceReader。MySqlSourceReader 通过 SingleThreadFetcherManager 创建 Fetcher 拉取分片数据,数据以 MySqlRecords 格式写入到 elementsQueue。

  1. ```sql MySqlParallelSource#createReader
  2. public SourceReader<T, MySqlSplit> createReader(SourceReaderContext readerContext) throws Exception { // note: 数据存储队列 FutureCompletingBlockingQueue<RecordsWithSplitIds> elementsQueue = new FutureCompletingBlockingQueue<>(); final Configuration readerConfiguration = getReaderConfig(readerContext);
  3. // note: Split Reader 工厂类
  4. Supplier splitReaderSupplier = () -> new MySqlSplitReader(readerConfiguration, readerContext.getIndexOfSubtask());
  5. return new MySqlSourceReader<>( elementsQueue, splitReaderSupplier, new MySqlRecordEmitter<>(deserializationSchema), readerConfiguration, readerContext); }
复制代码

将创建的 MySqlSourceReader 以事件的形式传递给 SourceCoordinator 进行注册。SourceCoordinator 接收到注册事件后,将 reader 地址及索引进行保存。

  1. ```sql
复制代码

MySqlSourceReader 启动后会向 MySqlSourceEnumerator 发送请求分片事件,从而收集分配的切片数据。

SourceOperator 初始化完毕后,调用 emitNext 由 SourceReaderBase 从 elementsQueue 获取数据集合并下发给 MySqlRecordEmitter。接口调用示意图:

O1CN01Fh2VTV2AAhnpJMOB1_!!6000000008163-2-tps-1080-459.png

3. MySqlSourceEnumerator 处理分片请求

MySqlSourceReader 启动时会向 MySqlSourceEnumerator 发送请求 RequestSplitEvent 事件,根据返回的切片范围读取区间数据。MySqlSourceEnumerator 全量读取阶段分片请求处理逻辑,最终返回一个 MySqlSnapshotSplit。

O1CN01pIsFxG1RgchAqtCY6_!!6000000002141-2-tps-1080-316.png

处理切片请求事件,为请求的 Reader 分配切片,通过发送 AddSplitEvent 时间传递 MySqlSplit (全量阶段MySqlSnapshotSplit、增量阶段 MySqlBinlogSplit)。

  1. ```sql MySqlSourceEnumerator#handleSplitRequest public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { if (!context.registeredReaders().containsKey(subtaskId)) {
  2. // reader failed between sending the request and now. skip this request.
  3. return;
  4. } // note: 将reader所属的subtaskId存储到TreeSet, 在处理binlog split时优先分配个task-0 readersAwaitingSplit.add(subtaskId);
  5. assignSplits(); }
  6. // note: 分配切片 private void assignSplits() { final Iterator awaitingReader = readersAwaitingSplit.iterator(); while (awaitingReader.hasNext()) { int nextAwaiting = awaitingReader.next(); // if the reader that requested another split has failed in the meantime, remove // it from the list of waiting readers if (!context.registeredReaders().containsKey(nextAwaiting)) { awaitingReader.remove(); continue; }
  7.     //note: 由 MySqlSplitAssigner 分配切片
  8.     Optional<MySqlSplit> split = splitAssigner.getNext();
  9.     if (split.isPresent()) {
  10.         final MySqlSplit mySqlSplit = split.get();
  11.         //  note: 发送AddSplitEvent, 为 Reader 返回切片信息
  12.         context.assignSplit(mySqlSplit, nextAwaiting);
  13.         awaitingReader.remove();
  14.         LOG.info("Assign split {} to subtask {}", mySqlSplit, nextAwaiting);
  15.     } else {
  16.         // there is no available splits by now, skip assigning
  17.         break;
  18.     }
  19. }
  20. }
复制代码

MySqlHybridSplitAssigner 处理全量切片、增量切片的逻辑。
   
  • 任务刚启动时,remainingTables 不为空,noMoreSplits 返回值为false,创建 SnapshotSplit。
  • 全量阶段分片读取完成后,noMoreSplits 返回值为true,  创建 BinlogSplit。


  1. ```sql
  2. MySqlHybridSplitAssigner#getNext
  3. @Override
  4. public Optional<MySqlSplit> getNext() {
  5.     if (snapshotSplitAssigner.noMoreSplits()) {
  6.         // binlog split assigning
  7.         if (isBinlogSplitAssigned) {
  8.             // no more splits for the assigner
  9.             return Optional.empty();
  10.         } else if (snapshotSplitAssigner.isFinished()) {
  11.             // we need to wait snapshot-assigner to be finished before
  12.             // assigning the binlog split. Otherwise, records emitted from binlog split
  13.             // might be out-of-order in terms of same primary key with snapshot splits.
  14.             isBinlogSplitAssigned = true;
  15.             //note: snapshot split 切片完成后,创建BinlogSplit。
  16.             return Optional.of(createBinlogSplit());
  17.         } else {
  18.             // binlog split is not ready by now
  19.             return Optional.empty();
  20.         }
  21.     } else {
  22.         // note: 由MySqlSnapshotSplitAssigner 创建 SnapshotSplit
  23.         // snapshot assigner still have remaining splits, assign split from it
  24.         return snapshotSplitAssigner.getNext();
  25.     }
  26. }
复制代码

MySqlSnapshotSplitAssigner 处理全量切片逻辑,通过 ChunkSplitter 生成切片,并存储到 Iterator 中。

  1. @Override
  2. public Optional<MySqlSplit> getNext() {
  3. if (!remainingSplits.isEmpty()) {
  4.      // return remaining splits firstly
  5.      Iterator<MySqlSnapshotSplit> iterator = remainingSplits.iterator();
  6.      MySqlSnapshotSplit split = iterator.next();
  7.      iterator.remove();
  8.      
  9.      //note: 已分配的切片存储到 assignedSplits 集合
  10.      assignedSplits.put(split.splitId(), split);
  11.      return Optional.of(split);
  12. } else {
  13.      // note: 初始化阶段 remainingTables 存储了要读取的表名
  14.      TableId nextTable = remainingTables.pollFirst();
  15.      if (nextTable != null) {
  16.          // split the given table into chunks (snapshot splits)
  17.          //  note: 初始化阶段创建了 ChunkSplitter,调用generateSplits 进行切片划分
  18.          Collection<MySqlSnapshotSplit> splits = chunkSplitter.generateSplits(nextTable);
  19.          //  note: 保留所有切片信息
  20.          remainingSplits.addAll(splits);
  21.          //  note: 已经完成分片的 Table
  22.          alreadyProcessedTables.add(nextTable);
  23.          //  note: 递归调用该该方法
  24.          return getNext();
  25.      } else {
  26.          return Optional.empty();
  27.      }
  28. }
  29. }
复制代码

ChunkSplitter 将表划分为均匀分布 or 不均匀分布切片的逻辑。读取的表必须包含物理主键。

  1. public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
  2. Table schema = mySqlSchema.getTableSchema(tableId).getTable();
  3. List<Column> primaryKeys = schema.primaryKeyColumns();
  4. // note: 必须有主键
  5. if (primaryKeys.isEmpty()) {
  6.      throw new ValidationException(
  7.              String.format(
  8.                      "Incremental snapshot for tables requires primary key,"
  9.                              + " but table %s doesn't have primary key.",
  10.                      tableId));
  11. }
  12. // use first field in primary key as the split key
  13. Column splitColumn = primaryKeys.get(0);
  14. final List<ChunkRange> chunks;
  15. try {
  16.       // note: 按主键列将数据划分成多个切片
  17.      chunks = splitTableIntoChunks(tableId, splitColumn);
  18. } catch (SQLException e) {
  19.      throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
  20. }
  21. //note: 主键数据类型转换、ChunkRange 包装成MySqlSnapshotSplit。
  22. // convert chunks into splits
  23. List<MySqlSnapshotSplit> splits = new ArrayList<>();
  24. RowType splitType = splitType(splitColumn);
  25. for (int i = 0; i < chunks.size(); i++) {
  26.      ChunkRange chunk = chunks.get(i);
  27.      MySqlSnapshotSplit split =
  28.              createSnapshotSplit(
  29.                      tableId, i, splitType, chunk.getChunkStart(), chunk.getChunkEnd());
  30.      splits.add(split);
  31. }
  32. return splits;
  33. }
复制代码

splitTableIntoChunks 根据物理主键划分切片。

  1. ```sql private List splitTableIntoChunks(TableId tableId, Column splitColumn)
  2. throws SQLException {
  3. final String splitColumnName = splitColumn.name(); // select min, max final Object[] minMaxOfSplitColumn = queryMinMax(jdbc, tableId, splitColumnName); final Object min = minMaxOfSplitColumn[0]; final Object max = minMaxOfSplitColumn[1]; if (min == null || max == null || min.equals(max)) {
  4. // empty table, or only one row, return full table scan as a chunk
  5. return Collections.singletonList(ChunkRange.all());
  6. }
  7. final List chunks; if (splitColumnEvenlyDistributed(splitColumn)) {
  8. // use evenly-sized chunks which is much efficient
  9. // note: 按主键均匀划分
  10. chunks = splitEvenlySizedChunks(min, max);
  11. } else {
  12. // note: 按主键非均匀划分
  13. // use unevenly-sized chunks which will request many queries and is not efficient.
  14. chunks = splitUnevenlySizedChunks(tableId, splitColumnName, min, max);
  15. }
  16. return chunks; }
  17. /** Checks whether split column is evenly distributed across its range. */ private static boolean splitColumnEvenlyDistributed(Column splitColumn) { // only column is auto-incremental are recognized as evenly distributed. // TODO: we may use MAX,MIN,COUNT to calculate the distribution in the future. if (splitColumn.isAutoIncremented()) { DataType flinkType = MySqlTypeUtils.fromDbzColumn(splitColumn); LogicalTypeRoot typeRoot = flinkType.getLogicalType().getTypeRoot(); // currently, we only support split column with type BIGINT, INT, DECIMAL return typeRoot == LogicalTypeRoot.BIGINT || typeRoot == LogicalTypeRoot.INTEGER || typeRoot == LogicalTypeRoot.DECIMAL; } else { return false; } }
  18. /**
  19. 根据拆分列的最小值和最大值将表拆分为大小均匀的块,并以 {@link #chunkSize} 步长滚动块。
  20. Split table into evenly sized chunks based on the numeric min and max value of split column,
  21. and tumble chunks in {@link #chunkSize} step size.
  22. / private List splitEvenlySizedChunks(Object min, Object max) { if (ObjectUtils.compare(ObjectUtils.plus(min, chunkSize), max) > 0) {
  23.   // there is no more than one chunk, return full table as a chunk
  24.   return Collections.singletonList(ChunkRange.all());
  25. }
  26. final List splits = new ArrayList<>(); Object chunkStart = null; Object chunkEnd = ObjectUtils.plus(min, chunkSize); // chunkEnd <= max while (ObjectUtils.compare(chunkEnd, max) <= 0) {
  27.   splits.add(ChunkRange.of(chunkStart, chunkEnd));
  28.   chunkStart = chunkEnd;
  29.   chunkEnd = ObjectUtils.plus(chunkEnd, chunkSize);
  30. } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); return splits; }
  31. /** 通过连续计算下一个块最大值,将表拆分为大小不均匀的块。
  32. Split table into unevenly sized chunks by continuously calculating next chunk max value. */ private List splitUnevenlySizedChunks(
  33. TableId tableId, String splitColumnName, Object min, Object max) throws SQLException {
  34. final List splits = new ArrayList<>(); Object chunkStart = null;
  35. Object chunkEnd = nextChunkEnd(min, tableId, splitColumnName, max); int count = 0; while (chunkEnd != null && ObjectUtils.compare(chunkEnd, max) <= 0) {
  36. // we start from [null, min + chunk_size) and avoid [null, min)
  37. splits.add(ChunkRange.of(chunkStart, chunkEnd));
  38. // may sleep a while to avoid DDOS on MySQL server
  39. maySleep(count++);
  40. chunkStart = chunkEnd;
  41. chunkEnd = nextChunkEnd(chunkEnd, tableId, splitColumnName, max);
  42. } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); return splits; }
  43. private Object nextChunkEnd( Object previousChunkEnd, TableId tableId, String splitColumnName, Object max) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); } if (ObjectUtils.compare(chunkEnd, max) >= 0) { return null; } else { return chunkEnd; } }
复制代码
MySqlSourceReader 处理切片分配请求



MySqlSourceReader 接收到切片分配请求后,会为先创建一个 SplitFetcher 线程,向 taskQueue 添加、执行 AddSplitsTask 任务用来处理添加分片任务,接着执行 FetchTask 使用 Debezium API 进行读取数据,读取的数据存储到 elementsQueue 中,SourceReaderBase 会从该队列中获取数据,并下发给 MySqlRecordEmitter。

处理切片分配事件时,创建 SplitFetcher 向 taskQueue 添加 AddSplitsTask。

  1. ```sql
  2. SingleThreadFetcherManager#addSplits
  3. public void addSplits(List<SplitT> splitsToAdd) {
  4.     SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
  5.     if (fetcher == null) {
  6.         fetcher = createSplitFetcher();
  7.         // Add the splits to the fetchers.
  8.         fetcher.addSplits(splitsToAdd);
  9.         startFetcher(fetcher);
  10.     } else {
  11.         fetcher.addSplits(splitsToAdd);
  12.     }
  13. }
  14. // 创建 SplitFetcher
  15. protected synchronized SplitFetcher<E, SplitT> createSplitFetcher() {
  16.     if (closed) {
  17.         throw new IllegalStateException("The split fetcher manager has closed.");
  18.     }
  19.     // Create SplitReader.
  20.     SplitReader<E, SplitT> splitReader = splitReaderFactory.get();
  21.     int fetcherId = fetcherIdGenerator.getAndIncrement();
  22.     SplitFetcher<E, SplitT> splitFetcher =
  23.             new SplitFetcher<>(
  24.                     fetcherId,
  25.                     elementsQueue,
  26.                     splitReader,
  27.                     errorHandler,
  28.                     () -> {
  29.                         fetchers.remove(fetcherId);
  30.                         elementsQueue.notifyAvailable();
  31.                     });
  32.     fetchers.put(fetcherId, splitFetcher);
  33.     return splitFetcher;
  34. }
  35. public void addSplits(List<SplitT> splitsToAdd) {
  36.     enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
  37.     wakeUp(true);
  38. }
复制代码

执行 SplitFetcher线程,首次执行 AddSplitsTask 线程添加分片,以后执行 FetchTask 线程拉取数据。

  1. SplitFetcher#runOnce
  2. void runOnce() {
  3. try {
  4.      if (shouldRunFetchTask()) {
  5.          runningTask = fetchTask;
  6.      } else {
  7.          runningTask = taskQueue.take();
  8.      }
  9.      
  10.      if (!wakeUp.get() && runningTask.run()) {
  11.          LOG.debug("Finished running task {}", runningTask);
  12.          runningTask = null;
  13.          checkAndSetIdle();
  14.      }
  15. } catch (Exception e) {
  16.      throw new RuntimeException(
  17.              String.format(
  18.                      "SplitFetcher thread %d received unexpected exception while polling the records",
  19.                      id),
  20.              e);
  21. }
  22. maybeEnqueueTask(runningTask);
  23. synchronized (wakeUp) {
  24.      // Set the running task to null. It is necessary for the shutdown method to avoid
  25.      // unnecessarily interrupt the running task.
  26.      runningTask = null;
  27.      // Set the wakeUp flag to false.
  28.      wakeUp.set(false);
  29.      LOG.debug("Cleaned wakeup flag.");
  30. }
  31. }
复制代码

AddSplitsTask 调用 MySqlSplitReader 的 handleSplitsChanges 方法,向切片队列中添加已分配的切片信息。在下一次 fetch() 调用时,从队列中获取切片并读取切片数据。

  1. AddSplitsTask#run
  2. public boolean run() {
  3.     for (SplitT s : splitsToAdd) {
  4.         assignedSplits.put(s.splitId(), s);
  5.     }
  6.     splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
  7.     return true;
  8. }
  9. MySqlSplitReader#handleSplitsChanges
  10. public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
  11.     if (!(splitsChanges instanceof SplitsAddition)) {
  12.         throw new UnsupportedOperationException(
  13.                 String.format(
  14.                         "The SplitChange type of %s is not supported.",
  15.                         splitsChanges.getClass()));
  16.     }
  17.     //note: 添加切片 到队列。
  18.     splits.addAll(splitsChanges.splits());
  19. }
复制代码

MySqlSplitReader 执行 fetch(),由 DebeziumReader 读取数据到事件队列,在对数据修正后以 MySqlRecords 格式返回。

  1. MySqlSplitReader#fetch
  2. @Override
  3. public RecordsWithSplitIds<SourceRecord> fetch() throws IOException {
  4.     // note: 创建Reader 并读取数据
  5.     checkSplitOrStartNext();
  6.     Iterator<SourceRecord> dataIt = null;
  7.     try {
  8.         // note:  对读取的数据进行修正
  9.         dataIt = currentReader.pollSplitRecords();
  10.     } catch (InterruptedException e) {
  11.         LOG.warn("fetch data failed.", e);
  12.         throw new IOException(e);
  13.     }
  14.     //  note: 返回的数据被封装为 MySqlRecords 进行传输
  15.     return dataIt == null
  16.             ? finishedSnapshotSplit()   
  17.             : MySqlRecords.forRecords(currentSplitId, dataIt);
  18. }
  19. private void checkSplitOrStartNext() throws IOException {
  20.     // the binlog reader should keep alive
  21.     if (currentReader instanceof BinlogSplitReader) {
  22.         return;
  23.     }
  24.     if (canAssignNextSplit()) {
  25.         // note:  从切片队列读取MySqlSplit
  26.         final MySqlSplit nextSplit = splits.poll();
  27.         if (nextSplit == null) {
  28.             throw new IOException("Cannot fetch from another split - no split remaining");
  29.         }
  30.         currentSplitId = nextSplit.splitId();
  31.         // note:  区分全量切片读取还是增量切片读取
  32.         if (nextSplit.isSnapshotSplit()) {
  33.             if (currentReader == null) {
  34.                 final MySqlConnection jdbcConnection = getConnection(config);
  35.                 final BinaryLogClient binaryLogClient = getBinaryClient(config);
  36.                 final StatefulTaskContext statefulTaskContext =
  37.                         new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
  38.                 // note: 创建SnapshotSplitReader,使用Debezium Api读取分配数据及区间Binlog值
  39.                 currentReader = new SnapshotSplitReader(statefulTaskContext, subtaskId);
  40.             }
  41.         } else {
  42.             // point from snapshot split to binlog split
  43.             if (currentReader != null) {
  44.                 LOG.info("It's turn to read binlog split, close current snapshot reader");
  45.                 currentReader.close();
  46.             }
  47.             final MySqlConnection jdbcConnection = getConnection(config);
  48.             final BinaryLogClient binaryLogClient = getBinaryClient(config);
  49.             final StatefulTaskContext statefulTaskContext =
  50.                     new StatefulTaskContext(config, binaryLogClient, jdbcConnection);
  51.             LOG.info("Create binlog reader");
  52.             // note: 创建BinlogSplitReader,使用Debezium API进行增量读取
  53.             currentReader = new BinlogSplitReader(statefulTaskContext, subtaskId);
  54.         }
  55.         // note: 执行Reader进行数据读取
  56.         currentReader.submitSplit(nextSplit);
  57.     }
  58. }
复制代码

5. DebeziumReader 数据处理

DebeziumReader 包含全量切片读取、增量切片读取两个阶段,数据读取后存储到 ChangeEventQueue,执行pollSplitRecords 时对数据进行修正。

SnapshotSplitReader 全量切片读取。全量阶段的数据读取通过执行 Select 语句查询出切片范围内的表数据,在写入队列前后执行 SHOW MASTER STATUS 时,写入当前偏移量。

  1. public void submitSplit(MySqlSplit mySqlSplit) {
  2. ......
  3. executor.submit(
  4.          () -> {
  5.              try {
  6.                  currentTaskRunning = true;
  7.                  // note: 数据读取,在数据前后插入Binlog当前偏移量
  8.                  // 1. execute snapshot read task。
  9.                  final SnapshotSplitChangeEventSourceContextImpl sourceContext =
  10.                          new SnapshotSplitChangeEventSourceContextImpl();
  11.                  SnapshotResult snapshotResult =
  12.                          splitSnapshotReadTask.execute(sourceContext);
  13.                  //  note: 为增量读取做准备,包含了起始偏移量
  14.                  final MySqlBinlogSplit appendBinlogSplit = createBinlogSplit(sourceContext);
  15.                  final MySqlOffsetContext mySqlOffsetContext =
  16.                          statefulTaskContext.getOffsetContext();
  17.                  mySqlOffsetContext.setBinlogStartPoint(
  18.                          appendBinlogSplit.getStartingOffset().getFilename(),
  19.                          appendBinlogSplit.getStartingOffset().getPosition());
  20.                  //  note: 从起始偏移量开始读取           
  21.                  // 2. execute binlog read task
  22.                  if (snapshotResult.isCompletedOrSkipped()) {
  23.                      // we should only capture events for the current table,
  24.                      Configuration dezConf =
  25.                              statefulTaskContext
  26.                                      .getDezConf()
  27.                                      .edit()
  28.                                      .with(
  29.                                              "table.whitelist",
  30.                                              currentSnapshotSplit.getTableId())
  31.                                      .build();
  32.                      // task to read binlog for current split
  33.                      MySqlBinlogSplitReadTask splitBinlogReadTask =
  34.                              new MySqlBinlogSplitReadTask(
  35.                                      new MySqlConnectorConfig(dezConf),
  36.                                      mySqlOffsetContext,
  37.                                      statefulTaskContext.getConnection(),
  38.                                      statefulTaskContext.getDispatcher(),
  39.                                      statefulTaskContext.getErrorHandler(),
  40.                                      StatefulTaskContext.getClock(),
  41.                                      statefulTaskContext.getTaskContext(),
  42.                                      (MySqlStreamingChangeEventSourceMetrics)
  43.                                              statefulTaskContext
  44.                                                      .getStreamingChangeEventSourceMetrics(),
  45.                                      statefulTaskContext
  46.                                              .getTopicSelector()
  47.                                              .getPrimaryTopic(),
  48.                                      appendBinlogSplit);
  49.                      splitBinlogReadTask.execute(
  50.                              new SnapshotBinlogSplitChangeEventSourceContextImpl());
  51.                  } else {
  52.                      readException =
  53.                              new IllegalStateException(
  54.                                      String.format(
  55.                                              "Read snapshot for mysql split %s fail",
  56.                                              currentSnapshotSplit));
  57.                  }
  58.              } catch (Exception e) {
  59.                  currentTaskRunning = false;
  60.                  LOG.error(
  61.                          String.format(
  62.                                  "Execute snapshot read task for mysql split %s fail",
  63.                                  currentSnapshotSplit),
  64.                          e);
  65.                  readException = e;
  66.              }
  67.          });
  68. }
复制代码

SnapshotSplitReader 增量切片读取。增量阶段切片读取重点是判断 BinlogSplitReadTask 什么时候停止,在读取到分片阶段的结束时的偏移量即终止。

  1. MySqlBinlogSplitReadTask#handleEvent
  2. protected void handleEvent(Event event) {
  3. // note: 事件下发 队列
  4. super.handleEvent(event);
  5. // note: 全量读取阶段需要终止Binlog读取
  6. // check do we need to stop for read binlog for snapshot split.
  7. if (isBoundedRead()) {
  8.      final BinlogOffset currentBinlogOffset =
  9.              new BinlogOffset(
  10.                      offsetContext.getOffset().get(BINLOG_FILENAME_OFFSET_KEY).toString(),
  11.                      Long.parseLong(
  12.                              offsetContext
  13.                                      .getOffset()
  14.                                      .get(BINLOG_POSITION_OFFSET_KEY)
  15.                                      .toString()));
  16.      // note: currentBinlogOffset > HW 停止读取
  17.      // reach the high watermark, the binlog reader should finished
  18.      if (currentBinlogOffset.isAtOrBefore(binlogSplit.getEndingOffset())) {
  19.          // send binlog end event
  20.          try {
  21.              signalEventDispatcher.dispatchWatermarkEvent(
  22.                      binlogSplit,
  23.                      currentBinlogOffset,
  24.                      SignalEventDispatcher.WatermarkKind.BINLOG_END);
  25.          } catch (InterruptedException e) {
  26.              logger.error("Send signal event error.", e);
  27.              errorHandler.setProducerThrowable(
  28.                      new DebeziumException("Error processing binlog signal event", e));
  29.          }
  30.          //  终止binlog读取
  31.          // tell reader the binlog task finished
  32.          ((SnapshotBinlogSplitChangeEventSourceContextImpl) context).finished();
  33.      }
  34. }
  35. }
复制代码

SnapshotSplitReader 执行 pollSplitRecords 时对队列中的原始数据进行修正。 具体处理逻辑查看 RecordUtils#normalizedSplitRecords。

  1. public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
  2. if (hasNextElement.get()) {
  3.      // data input: [low watermark event][snapshot events][high watermark event][binlogevents][binlog-end event]
  4.      // data output: [low watermark event][normalized events][high watermark event]
  5.      boolean reachBinlogEnd = false;
  6.      final List<SourceRecord> sourceRecords = new ArrayList<>();
  7.      while (!reachBinlogEnd) {
  8.          // note: 处理队列中写入的 DataChangeEvent 事件
  9.          List<DataChangeEvent> batch = queue.poll();
  10.          for (DataChangeEvent event : batch) {
  11.              sourceRecords.add(event.getRecord());
  12.              if (RecordUtils.isEndWatermarkEvent(event.getRecord())) {
  13.                  reachBinlogEnd = true;
  14.                  break;
  15.              }
  16.          }
  17.      }
  18.      // snapshot split return its data once
  19.      hasNextElement.set(false);
  20.      //  ************   修正数据  ***********
  21.      return normalizedSplitRecords(currentSnapshotSplit, sourceRecords, nameAdjuster)
  22.              .iterator();
  23. }
  24. // the data has been polled, no more data
  25. reachEnd.compareAndSet(false, true);
  26. return null;
  27. }
复制代码

BinlogSplitReader 数据读取。读取逻辑比较简单,重点是起始偏移量的设置,起始偏移量为所有切片的 HW。

BinlogSplitReader 执行 pollSplitRecords 时对队列中的原始数据进行修正,保障数据一致性。 增量阶段的Binlog读取是无界的,数据会全部下发到事件队列,BinlogSplitReader 通过 shouldEmit() 判断数据是否下发。

  1. BinlogSplitReader#pollSplitRecords
  2. public Iterator<SourceRecord> pollSplitRecords() throws InterruptedException {
  3. checkReadException();
  4. final List<SourceRecord> sourceRecords = new ArrayList<>();
  5. if (currentTaskRunning) {
  6.      List<DataChangeEvent> batch = queue.poll();
  7.      for (DataChangeEvent event : batch) {
  8.          if (shouldEmit(event.getRecord())) {
  9.              sourceRecords.add(event.getRecord());
  10.          }
  11.      }
  12. }
  13. return sourceRecords.iterator();
  14. }
复制代码

事件下发条件:

  • 新收到的 event post 大于 maxwm;
  • 当前 data 值所属某个 snapshot spilt & 偏移量大于 HWM,下发数据。

  1. /**
  2. *
  3. * Returns the record should emit or not.
  4. *
  5. * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event that
  6. * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
  7. * since the offset is after its high watermark.
  8. *
  9. * <pre> E.g: the data input is :
  10. *    snapshot-split-0 info : [0,    1024) highWatermark0
  11. *    snapshot-split-1 info : [1024, 2048) highWatermark1
  12. *  the data output is:
  13. *  only the binlog event belong to [0,    1024) and offset is after highWatermark0 should send,
  14. *  only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should send.
  15. * </pre>
  16. */
  17. private boolean shouldEmit(SourceRecord sourceRecord) {
  18.     if (isDataChangeRecord(sourceRecord)) {
  19.         TableId tableId = getTableId(sourceRecord);
  20.         BinlogOffset position = getBinlogPosition(sourceRecord);
  21.         // aligned, all snapshot splits of the table has reached max highWatermark
  22.       
  23.         // note:  新收到的event post 大于 maxwm ,直接下发
  24.         if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
  25.             return true;
  26.         }
  27.         Object[] key =
  28.                 getSplitKey(
  29.                         currentBinlogSplit.getSplitKeyType(),
  30.                         sourceRecord,
  31.                         statefulTaskContext.getSchemaNameAdjuster());
  32.         for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
  33.             /**
  34.              *  note: 当前 data值所属某个snapshot spilt &  偏移量大于 HWM,下发数据
  35.              */
  36.             if (RecordUtils.splitKeyRangeContains(
  37.                             key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
  38.                     && position.isAtOrBefore(splitInfo.getHighWatermark())) {
  39.                 return true;
  40.             }
  41.         }
  42.         // not in the monitored splits scope, do not emit
  43.         return false;
  44.     }
  45.     // always send the schema change event and signal event
  46.     // we need record them to state of Flink
  47.     return true;
  48. }
复制代码

6. MySqlRecordEmitter 数据下发

SourceReaderBase 从队列中获取切片读取的 DataChangeEvent 数据集合,将数据类型由 Debezium 的 DataChangeEvent 转换为 Flink 的 RowData 类型。

SourceReaderBase 处理切片数据流程。

  1. ```java org.apache.flink.connector.base.source.reader.SourceReaderBase#pollNext public InputStatus pollNext(ReaderOutput output) throws Exception { // make sure we have a fetch we are working on, or move to the next RecordsWithSplitIds recordsWithSplitId = this.currentFetch; if (recordsWithSplitId == null) {
  2. recordsWithSplitId = getNextFetch(output);
  3. if (recordsWithSplitId == null) {
  4.      return trace(finishedOrAvailableLater());
  5. }
  6. }
  7. // we need to loop here, because we may have to go across splits while (true) {
  8. // Process one record.
  9. // note:  通过MySqlRecords从迭代器中读取单条数据
  10. final E record = recordsWithSplitId.nextRecordFromSplit();
  11. if (record != null) {
  12.      // emit the record.
  13.      recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
  14.      LOG.trace("Emitted record: {}", record);
  15.      // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
  16.      // more is available. If nothing more is available, the next invocation will find
  17.      // this out and return the correct status.
  18.      // That means we emit the occasional 'false positive' for availability, but this
  19.      // saves us doing checks for every record. Ultimately, this is cheaper.
  20.      return trace(InputStatus.MORE_AVAILABLE);
  21. } else if (!moveToNextSplit(recordsWithSplitId, output)) {
  22.      // The fetch is done and we just discovered that and have not emitted anything, yet.
  23.      // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
  24.      // rather than emitting nothing and waiting for the caller to call us again.
  25.      return pollNext(output);
  26. }
  27. // else fall through the loop
  28. } }
  29. private RecordsWithSplitIds getNextFetch(final ReaderOutput output) { splitFetcherManager.checkErrors();
  30. LOG.trace("Getting next source data batch from queue");
  31. // note: 从elementsQueue 获取数据
  32. final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
  33. if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
  34.     return null;
  35. }
  36. currentFetch = recordsWithSplitId;
  37. return recordsWithSplitId;
  38. }
复制代码

MySqlRecords 返回单条数据集合。

  1. ```java
  2. com.ververica.cdc.connectors.mysql.source.split.MySqlRecords#nextRecordFromSplit
  3. public SourceRecord nextRecordFromSplit() {
  4.     final Iterator<SourceRecord> recordsForSplit = this.recordsForCurrentSplit;
  5.     if (recordsForSplit != null) {
  6.         if (recordsForSplit.hasNext()) {
  7.             return recordsForSplit.next();
  8.         } else {
  9.             return null;
  10.         }
  11.     } else {
  12.         throw new IllegalStateException();
  13.     }
  14. }
复制代码

MySqlRecordEmitter 通过 RowDataDebeziumDeserializeSchema 将数据转换为Rowdata。

  1. com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter#emitRecord
  2. public void emitRecord(SourceRecord element, SourceOutput<T> output, MySqlSplitState splitState)
  3.     throws Exception {
  4. if (isWatermarkEvent(element)) {
  5.     BinlogOffset watermark = getWatermark(element);
  6.     if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) {
  7.         splitState.asSnapshotSplitState().setHighWatermark(watermark);
  8.     }
  9. } else if (isSchemaChangeEvent(element) && splitState.isBinlogSplitState()) {
  10.     HistoryRecord historyRecord = getHistoryRecord(element);
  11.     Array tableChanges =
  12.             historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
  13.     TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
  14.     for (TableChanges.TableChange tableChange : changes) {
  15.         splitState.asBinlogSplitState().recordSchema(tableChange.getId(), tableChange);
  16.     }
  17. } else if (isDataChangeRecord(element)) {
  18.     //  note: 数据的处理
  19.     if (splitState.isBinlogSplitState()) {
  20.         BinlogOffset position = getBinlogPosition(element);
  21.         splitState.asBinlogSplitState().setStartingOffset(position);
  22.     }
  23.     debeziumDeserializationSchema.deserialize(
  24.             element,
  25.             new Collector<T>() {
  26.                 @Override
  27.                 public void collect(final T t) {
  28.                     output.collect(t);
  29.                 }
  30.                 @Override
  31.                 public void close() {
  32.                     // do nothing
  33.                 }
  34.             });
  35. } else {
  36.     // unknown element
  37.     LOG.info("Meet unknown element {}, just skip.", element);
  38. }
  39. }
复制代码

RowDataDebeziumDeserializeSchema 序列化过程。

  1. com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema#deserialize
  2. public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
  3.     Envelope.Operation op = Envelope.operationFor(record);
  4.     Struct value = (Struct) record.value();
  5.     Schema valueSchema = record.valueSchema();
  6.     if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
  7.         GenericRowData insert = extractAfterRow(value, valueSchema);
  8.         validator.validate(insert, RowKind.INSERT);
  9.         insert.setRowKind(RowKind.INSERT);
  10.         out.collect(insert);
  11.     } else if (op == Envelope.Operation.DELETE) {
  12.         GenericRowData delete = extractBeforeRow(value, valueSchema);
  13.         validator.validate(delete, RowKind.DELETE);
  14.         delete.setRowKind(RowKind.DELETE);
  15.         out.collect(delete);
  16.     } else {
  17.         GenericRowData before = extractBeforeRow(value, valueSchema);
  18.         validator.validate(before, RowKind.UPDATE_BEFORE);
  19.         before.setRowKind(RowKind.UPDATE_BEFORE);
  20.         out.collect(before);
  21.         GenericRowData after = extractAfterRow(value, valueSchema);
  22.         validator.validate(after, RowKind.UPDATE_AFTER);
  23.         after.setRowKind(RowKind.UPDATE_AFTER);
  24.         out.collect(after);
  25.     }
  26. }
复制代码

7. MySqlSourceReader 汇报切片读取完成事件

MySqlSourceReader 处理完一个全量切片后,会向 MySqlSourceEnumerator 发送已完成的切片信息,包含切片 ID、HighWatermar ,然后继续发送切片请求。

  1. com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader#onSplitFinished
  2. protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) {
  3. for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) {
  4.     MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit();
  5.     finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit());
  6. }
  7. /**
  8. *   note: 发送切片完成事件
  9. */
  10. reportFinishedSnapshotSplitsIfNeed();
  11. //  上一个spilt处理完成后继续发送切片请求
  12. context.sendSplitRequest();
  13. }
  14. private void reportFinishedSnapshotSplitsIfNeed() {
  15.     if (!finishedUnackedSplits.isEmpty()) {
  16.         final Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
  17.         for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) {
  18.             // note: 发送切片ID,及最大偏移量
  19.             finishedOffsets.put(split.splitId(), split.getHighWatermark());
  20.         }
  21.         FinishedSnapshotSplitsReportEvent reportEvent =
  22.                 new FinishedSnapshotSplitsReportEvent(finishedOffsets);
  23.         context.sendSourceEventToCoordinator(reportEvent);
  24.         LOG.debug(
  25.                 "The subtask {} reports offsets of finished snapshot splits {}.",
  26.                 subtaskId,
  27.                 finishedOffsets);
  28.     }
  29. }
复制代码

8. MySqlSourceEnumerator 分配增量切片

全量阶段所有分片读取完毕后,MySqlHybridSplitAssigner 会创建 BinlogSplit 进行后续增量读取,在创建 BinlogSplit 会从全部已完成的全量切片中筛选最小 BinlogOffset。注意:2.0.0 分支 createBinlogSplit 最小偏移量总是从 0 开始,最新 master 分支已经修复这个 BUG。

  1. private MySqlBinlogSplit createBinlogSplit() {
  2.     final List<MySqlSnapshotSplit> assignedSnapshotSplit =
  3.             snapshotSplitAssigner.getAssignedSplits().values().stream()
  4.                     .sorted(Comparator.comparing(MySqlSplit::splitId))
  5.                     .collect(Collectors.toList());
  6.     Map<String, BinlogOffset> splitFinishedOffsets =
  7.             snapshotSplitAssigner.getSplitFinishedOffsets();
  8.     final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos = new ArrayList<>();
  9.     final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();
  10.     BinlogOffset minBinlogOffset = null;
  11.     // note: 从所有assignedSnapshotSplit中筛选最小偏移量
  12.     for (MySqlSnapshotSplit split : assignedSnapshotSplit) {
  13.         // find the min binlog offset
  14.         BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());
  15.         if (minBinlogOffset == null || binlogOffset.compareTo(minBinlogOffset) < 0) {
  16.             minBinlogOffset = binlogOffset;
  17.         }
  18.         finishedSnapshotSplitInfos.add(
  19.                 new FinishedSnapshotSplitInfo(
  20.                         split.getTableId(),
  21.                         split.splitId(),
  22.                         split.getSplitStart(),
  23.                         split.getSplitEnd(),
  24.                         binlogOffset));
  25.         tableSchemas.putAll(split.getTableSchemas());
  26.     }
  27.     final MySqlSnapshotSplit lastSnapshotSplit =
  28.             assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();
  29.       
  30.     return new MySqlBinlogSplit(
  31.             BINLOG_SPLIT_ID,
  32.             lastSnapshotSplit.getSplitKeyType(),
  33.             minBinlogOffset == null ? BinlogOffset.INITIAL_OFFSET : minBinlogOffset,
  34.             BinlogOffset.NO_STOPPING_OFFSET,
  35.             finishedSnapshotSplitInfos,
  36.             tableSchemas);
  37. }
复制代码




最新经典文章,欢迎关注公众号



---------------------

作者:任建旭
来源:flink-learning
原文:Apache Flink CDC 批流融合技术原理分析



没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条