分享

一文了解Flink1.11中CDC Connectors操作实现

本帖最后由 levycui 于 2020-11-4 22:13 编辑
问题导读:
1、CDC是什么?
2、Flink提供的 table format如何实现?
3、如何实现mysql-cdc的操作?
4、如何实现changelog-json的操作?



Flink1.11引入了CDC的connector,通过这种方式可以很方便地捕获变化的数据,大大简化了数据处理的流程。Flink1.11的CDC connector主要包括:MySQL CDC和Postgres CDC,同时对Kafka的Connector支持canal-json和debezium-json以及changelog-json的format。本文主要分享以下内容:

  •     CDC简介
  •     Flink提供的 table format
  •     使用过程中的注意点
  •     mysql-cdc的操作实践
  •     canal-json的操作实践
  •     changelog-json的操作实践

简介

Flink CDC Connector 是ApacheFlink的一组数据源连接器,使用变化数据捕获change data capture (CDC))从不同的数据库中提取变更数据。Flink CDC连接器将Debezium集成为引擎来捕获数据变更。因此,它可以充分利用Debezium的功能。
特点

  •     支持读取数据库快照,并且能够持续读取数据库的变更日志,即使发生故障,也支持exactly-once 的处理语义
  •     对于DataStream API的CDC connector,用户无需部署Debezium和Kafka,即可在单个作业中使用多个数据库和表上的变更数据。
  •     对于Table/SQL API 的CDC connector,用户可以使用SQL DDL创建CDC数据源,来监视单个表上的数据变更。

使用场景

  •     数据库之间的增量数据同步
  •     审计日志
  •     数据库之上的实时物化视图
  •     基于CDC的维表join
  •     …

Flink提供的 table format

Flink提供了一系列可以用于table connector的table format,具体如下:
2020-11-04_185220.jpg

使用过程中的注意点
使用MySQL CDC的注意点

如果要使用MySQL CDC connector,对于程序而言,需要添加如下依赖:

  1. <dependency>
  2.   <groupId>com.alibaba.ververica</groupId>
  3.   <artifactId>flink-connector-mysql-cdc</artifactId>
  4.   <version>1.0.0</version>
  5. </dependency>
复制代码


如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-mysql-cdc-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。
使用canal-json的注意点

如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:

  1. <!-- universal -->
  2. <dependency>
  3.     <groupId>org.apache.flink</groupId>
  4.     <artifactId>flink-connector-kafka_2.11</artifactId>
  5.     <version>1.11.0</version>
  6. </dependency>
复制代码

如果要使用Flink SQL Client,需要添加如下jar包:flink-sql-connector-kafka_2.11-1.11.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。由于Flink1.11的安装包 的lib目录下并没有提供该jar包,所以必须要手动添加依赖包,否则会报如下错误:

  1. [ERROR] Could not execute SQL statement. Reason:
  2. org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
  3. Available factory identifiers are:
  4. datagen
  5. mysql-cdc
复制代码

使用changelog-json的注意点

如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:

  1. <dependency>
  2.   <groupId>com.alibaba.ververica</groupId>
  3.   <artifactId>flink-format-changelog-json</artifactId>
  4.   <version>1.0.0</version>
  5. </dependency>
复制代码

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

mysql-cdc的操作实践
创建MySQL数据源表

在创建MySQL CDC表之前,需要先创建MySQL的数据表,如下:

  1. -- MySQL
  2. /*Table structure for table `order_info` */
  3. DROP TABLE IF EXISTS `order_info`;
  4. CREATE TABLE `order_info` (
  5.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  6.   `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
  7.   `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
  8.   `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
  9.   `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态,1表示下单,2表示支付',
  10.   `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  11.   `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
  12.   `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
  13.   `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
  14.   `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
  15.   `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
  16.   `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  17.   `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
  18.   `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
  19.   `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
  20.   `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
  21.   `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
  22.   `province_id` int(20) DEFAULT NULL COMMENT '地区',
  23.   PRIMARY KEY (`id`)
  24. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
  25. -- ----------------------------
  26. -- Records of order_info
  27. -- ----------------------------
  28. INSERT INTO `order_info`
  29. VALUES (476, 'lAXjcL', '13408115089', 433.00, '2', 10, '2', 'OYyAdSdLxedceqovndCD', 'ihjAYsSjrgJMQVdFQnSy', '8728720206', '', '2020-06-18 02:21:38', NULL, NULL, NULL, NULL, NULL, 9);
  30. INSERT INTO `order_info`
  31. VALUES (477, 'QLiFDb', '13415139984', 772.00, '1', 90, '2', 'OizYrQbKuWvrvdfpkeSZ', 'wiBhhqhMndCCgXwmWVQq', '1679381473', '', '2020-06-18 09:12:25', NULL, NULL, NULL, NULL, NULL, 3);
  32. INSERT INTO `order_info`
  33. VALUES (478, 'iwKjQD', '13320383859', 88.00, '1', 107, '1', 'cbXLKtNHWOcWzJVBWdAs', 'njjsnknHxsxhuCCeNDDi', '0937074290', '', '2020-06-18 15:56:34', NULL, NULL, NULL, NULL, NULL, 7);
  34. /*Table structure for table `order_detail` */
  35. CREATE TABLE `order_detail` (
  36.   `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
  37.   `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
  38.   `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
  39.   `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
  40.   `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
  41.   `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
  42.   `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
  43.   `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  44.   PRIMARY KEY (`id`)
  45. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单明细表';
  46. -- ----------------------------
  47. -- Records of order_detail
  48. -- ----------------------------
  49. INSERT INTO `order_detail`
  50. VALUES (1329, 476, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://XLMByOyZDTJQYxphQHNTgYAFzJJCKTmCbzvEJIpz', 8900.00, '3', '2020-06-18 02:21:38');
  51. INSERT INTO `order_detail`
  52. VALUES (1330, 477, 9, '荣耀10 GT游戏加速 AIS手持夜景 6GB+64GB 幻影蓝全网通 移动联通电信', 'http://ixOCtlYmlxEEgUfPLiLdjMftzrleOEIBKSjrhMne', 2452.00, '4', '2020-06-18 09:12:25');
  53. INSERT INTO `order_detail`
  54. VALUES (1331, 478, 4, '小米Play 流光渐变AI双摄 4GB+64GB 梦幻蓝 全网通4G 双卡双待 小水滴全面屏拍照游戏智能手机', 'http://RqfEFnAOqnqRnNZLFRvBuwXxwNBtptYJCILDKQYv', 1442.00, '1', '2020-06-18 15:56:34');
  55. INSERT INTO `order_detail`
  56. VALUES (1332, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://IwhuCDlsiLenfKjPzbJrIoxswdfofKhJLMzlJAKV', 8900.00, '3', '2020-06-18 15:56:34');
  57. INSERT INTO `order_detail`
  58. VALUES (1333, 478, 8, 'Apple iPhone XS Max (A2104) 256GB 深空灰色 移动联通电信4G手机 双卡双待', 'http://bbfwTbAzTWapywODzOtDJMJUEqNTeRTUQuCDkqXP', 8900.00, '1', '2020-06-18 15:56:34');
复制代码

Flink SQL Cli创建CDC数据源

启动 Flink 集群,再启动 SQL CLI,执行下面命令:

-- 创建订单信息表
  1. CREATE TABLE order_info(
  2.     id BIGINT,
  3.     user_id BIGINT,
  4.     create_time TIMESTAMP(0),
  5.     operate_time TIMESTAMP(0),
  6.     province_id INT,
  7.     order_status STRING,
  8.     total_amount DECIMAL(10, 5)
  9.   ) WITH (
  10.     'connector' = 'mysql-cdc',
  11.     'hostname' = 'kms-1',
  12.     'port' = '3306',
  13.     'username' = 'root',
  14.     'password' = '123qwe',
  15.     'database-name' = 'mydw',
  16.     'table-name' = 'order_info'
  17. );
复制代码

在Flink SQL Cli中查询该表的数据:result-mode: tableau,+表示数据的insert
2020-11-04_185304.jpg

在SQL CLI中创建订单详情表:

  1. CREATE TABLE order_detail(
  2.     id BIGINT,
  3.     order_id BIGINT,
  4.     sku_id BIGINT,
  5.     sku_name STRING,
  6.     sku_num BIGINT,
  7.     order_price DECIMAL(10, 5),
  8.     create_time TIMESTAMP(0)
  9. ) WITH (
  10.     'connector' = 'mysql-cdc',
  11.     'hostname' = 'kms-1',
  12.     'port' = '3306',
  13.     'username' = 'root',
  14.     'password' = '123qwe',
  15.     'database-name' = 'mydw',
  16.     'table-name' = 'order_detail'
  17. );
复制代码

查询结果如下:
2020-11-04_185334.jpg

执行JOIN操作:

  1. SELECT
  2.     od.id,
  3.     oi.id order_id,
  4.     oi.user_id,
  5.     oi.province_id,
  6.     od.sku_id,
  7.     od.sku_name,
  8.     od.sku_num,
  9.     od.order_price,
  10.     oi.create_time,
  11.     oi.operate_time
  12. FROM
  13.    (
  14.     SELECT *
  15.     FROM order_info
  16.     WHERE
  17.          order_status = '2'-- 已支付
  18.    ) oi
  19.    JOIN
  20.   (
  21.     SELECT *
  22.     FROM order_detail
  23.   ) od
  24.   ON oi.id = od.order_id;
复制代码

canal-json的操作实践

关于cannal的使用方式,可以参考我的另一篇文章:基于Canal与Flink实现数据实时增量同步(一)。我已经将下面的表通过canal同步到了kafka,具体格式为:

  1. {
  2.     "data":[
  3.         {
  4.             "id":"1",
  5.             "region_name":"华北"
  6.         },
  7.         {
  8.             "id":"2",
  9.             "region_name":"华东"
  10.         },
  11.         {
  12.             "id":"3",
  13.             "region_name":"东北"
  14.         },
  15.         {
  16.             "id":"4",
  17.             "region_name":"华中"
  18.         },
  19.         {
  20.             "id":"5",
  21.             "region_name":"华南"
  22.         },
  23.         {
  24.             "id":"6",
  25.             "region_name":"西南"
  26.         },
  27.         {
  28.             "id":"7",
  29.             "region_name":"西北"
  30.         }
  31.     ],
  32.     "database":"mydw",
  33.     "es":1597128441000,
  34.     "id":102,
  35.     "isDdl":false,
  36.     "mysqlType":{
  37.         "id":"varchar(20)",
  38.         "region_name":"varchar(20)"
  39.     },
  40.     "old":null,
  41.     "pkNames":null,
  42.     "sql":"",
  43.     "sqlType":{
  44.         "id":12,
  45.         "region_name":12
  46.     },
  47.     "table":"base_region",
  48.     "ts":1597128441424,
  49.     "type":"INSERT"
  50. }
复制代码

在SQL CLI中创建该canal-json格式的表:

  1. CREATE TABLE region (
  2.   id BIGINT,
  3.   region_name STRING
  4. ) WITH (
  5. 'connector' = 'kafka',
  6. 'topic' = 'mydw.base_region',
  7. 'properties.bootstrap.servers' = 'kms-3:9092',
  8. 'properties.group.id' = 'testGroup',
  9. 'format' = 'canal-json' ,
  10. 'scan.startup.mode' = 'earliest-offset'
  11. );
复制代码



查询结果如下:
2020-11-04_185408.jpg

changelog-json的操作实践
创建MySQL数据源

参见上面的order_info
Flink SQL Cli创建changelog-json表

  1. CREATE TABLE order_gmv2kafka (
  2.   day_str STRING,
  3.   gmv DECIMAL(10, 5)
  4. ) WITH (
  5.     'connector' = 'kafka',
  6.     'topic' = 'order_gmv_kafka',
  7.     'scan.startup.mode' = 'earliest-offset',
  8.     'properties.bootstrap.servers' = 'kms-3:9092',
  9.     'format' = 'changelog-json'
  10. );
复制代码
  1. INSERT INTO order_gmv2kafka
  2. SELECT DATE_FORMAT(create_time, 'yyyy-MM-dd') as day_str, SUM(total_amount) as gmv
  3. FROM order_info
  4. WHERE order_status = '2' -- 订单已支付
  5. GROUP BY DATE_FORMAT(create_time, 'yyyy-MM-dd');
复制代码

查询表看一下结果:
2020-11-04_185441.jpg

再查一下kafka的数据:

  1. {"data":{"day_str":"2020-06-18","gmv":433},"op":"+I"}
复制代码

当将另外两个订单的状态order_status更新为2时,总金额=443+772+88=1293再观察数据:
2020-11-04_185511.jpg

总结

本文基于Flink1.11的SQL,对新添加的CDC connector的使用方式进行了阐述。主要包括MySQL CDC connector、canal-json及changelog-json的format,并指出了使用过程中的注意点。另外本文给出了完整的使用示例,如果你有现成的环境,那么可以直接进行测试使用。


作者:Jia MaoXiang
来源:https://jiamaoxiang.top/2020/08/12/Flink1-11

最新经典文章,欢迎关注公众号

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条