分享

Maxwell:实时同步MySQL数据到消息队列Kafka

levycui 2020-12-22 19:50:58 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 4540
本帖最后由 levycui 于 2020-12-22 20:26 编辑
问题导读:
1、Maxwell与Canal相比,具有哪些特点?
2、Maxwell如何配置?
3、抓取数据与Canal有哪些不同?
4、如何实现全量同步数据?



maxwell 是由美国zendesk开源,用java编写的Mysql实时抓取软件。其抓取的原理也是基于binlog。

一 工具对比


Maxwell与Canal相比,具有以下特点:
  •     Maxwell 没有 Canal那种server+client模式,只有一个server把数据发送到消息队列或redis。
  •     Maxwell 有一个亮点功能,就是Canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
  •     Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
  •     Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
  •     Maxwell比Canal更加轻量级。


二 安装与准备

1.  解压缩maxwell-1.25.0.tar.gz 到某个目录下。

2.  在数据库中建立一个maxwell库用于存储Maxwell的元数据。
  1. CREATE DATABASE maxwell;
复制代码

3.  并且分配一个账号可以操作该数据库

  1. GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123123';
复制代码

4.  分配这个账号可以监控其他数据库的权限
  1. GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
复制代码


三 配置

在任意位置建立maxwell.properties 文件,配置如下:
  1. producer=kafka
  2. kafka.bootstrap.servers=hadoop1:9092,hadoop2:9092,hadoop3:9092
  3. kafka_topic=ODS_DB_GMALL2020_M
  4. host=hadoop2
  5. user=maxwell
  6. password=123123
  7. producer_partition_by=primary_key
  8. client_id=maxwell_1
复制代码

maxwell还可以通过配置过滤器来决定对哪些数据库和表进行监控抓取,如下:

例1,监控所有foodb库下的数据变动,不包括所有 foodb.tbl表下的变动以及符合foodb./table_\d+/'模式的表下的变动。
  1. filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
复制代码

例2,监控所有库所有表的数据变动,不包括db1库下的所有表的数据变动。
  1. filter = 'exclude: *.*, include: db1.*'
复制代码

更详细的过滤器配置,详见官网:http://maxwells-daemon.io/filtering/

启动程序:
  1. nohup /ext/maxwell-1.25.0/bin/maxwell --config /xxx/xxxx/maxwell.properties >/dev/null 2>&1 &
复制代码


四 抓取数据与Canal进行对比


1.  在kafka下创建对应tipic,用于接收Maxwell发送的数据变动情况:

  1. bin/kafka-topics.sh --create --topic ODS_DB_GMALL2020_M --zookeeperhadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 12 --replication-factor 1
复制代码

2.  执行插入测试语句:

  1. INSERT INTO z_user_info VALUES(30,'zhang3','13810001010'),(31,'li4','1389999999');
复制代码

canal接收到的数据变动情况:
  1. {"data":[{"id":"30","user_name":"zhang3","tel":"13810001010"},{"id":"31","user_name":"li4","tel":"1389999999"}],"database":"gmall-2020-04","es":1589385314000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","user_name":"varchar(20)","tel":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"user_name":12,"tel":12},"table":"z_user_info","ts":1589385314116,"type":"INSERT"}
复制代码

maxwell接收到的数据变动情况:

  1. {"database":"gmall-2020-04","table":"z_user_info","type":"insert","ts":1589385314,"xid":82982,"xoffset":0,"data":{"id":30,"user_name":"zhang3","tel":"13810001010"}}
  2. {"database":"gmall-2020-04","table":"z_user_info","type":"insert","ts":1589385314,"xid":82982,"commit":true,"data":{"id":31,"user_name":"li4","tel":"1389999999"}}
复制代码

3.  执行update操作:

  1. UPDATE z_user_info SET user_name='wang55' WHERE id IN(30,31);
复制代码

canal接收到的数据变动情况:

  1. {"data":[{"id":"30","user_name":"wang55","tel":"13810001010"},{"id":"31","user_name":"wang55","tel":"1389999999"}],"database":"gmall-2020-04","es":1589385508000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","user_name":"varchar(20)","tel":"varchar(20)"},"old":[{"user_name":"zhang3"},{"user_name":"li4"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"user_name":12,"tel":12},"table":"z_user_info","ts":1589385508676,"type":"UPDATE"}
复制代码

maxwell接收到的数据变动情况:
  1. {"database":"gmall-2020-04","table":"z_user_info","type":"update","ts":1589385508,"xid":83206,"xoffset":0,"data":{"id":30,"user_name":"wang55","tel":"13810001010"},"old":{"user_name":"zhang3"}}
  2. {"database":"gmall-2020-04","table":"z_user_info","type":"update","ts":1589385508,"xid":83206,"commit":true,"data":{"id":31,"user_name":"wang55","tel":"1389999999"},"old":{"user_name":"li4"}}
复制代码

4.  执行delete操作:

  1. DELETE  FROM z_user_info   WHERE id IN(30,31)
复制代码

canal接收到的数据变动情况:

  1. {"data":[{"id":"30","user_name":"wang55","tel":"13810001010"},{"id":"31","user_name":"wang55","tel":"1389999999"}],"database":"gmall-2020-04","es":1589385644000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","user_name":"varchar(20)","tel":"varchar(20)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"user_name":12,"tel":12},"table":"z_user_info","ts":1589385644829,"type":"DELETE"}
复制代码

maxwell接收到的数据变动情况:

  1. {"database":"gmall-2020-04","table":"z_user_info","type":"delete","ts":1589385644,"xid":83367,"xoffset":0,"data":{"id":30,"user_name":"wang55","tel":"13810001010"}}
  2. {"database":"gmall-2020-04","table":"z_user_info","type":"delete","ts":1589385644,"xid":83367,"commit":true,"data":{"id":31,"user_name":"wang55","tel":"1389999999"}}
复制代码

总结数据特点:

1. 日志结构

canal 每一条SQL会产生一条日志,如果该条Sql影响了多行数据,则已经会通过集合的方式归集在这条日志中。(即使是一条数据也会是数组结构)

maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果想知道这些日志是否是通过某一条sql产生的可以通过xid进行判断,相同的xid的日志来自同一sql。

2. 数字类型

当原始数据是数字类型时,maxwell会尊重原始数据的类型不增加双引,变为字符串。

canal一律转换为字符串。

3. 带原始数据字段定义

canal数据中会带入表结构。maxwell更简洁。


五 全量同步数据


使用maxwell-bootstrap命令全量同步数据,命令如下:

  1. bin/maxwell-bootstrap --user maxwell --password 123123 --host hdp1 --database gmall0105 --table table_name --client_id maxwell_1
复制代码

其中client_id 是指另一个已启动的maxwell监控进程的client_id.


六 Maxwell发送数据解析

Maxwell监控的数据变动情况不能直接使用,需要对数据进行解析,提取关键信息,具体解析步骤如下:
  1. <div>def main(args: Array[String]): Unit = {
  2.   val sparkConf: SparkConf = new SparkConf().setAppName("ods_gmall_maxwell_app").setMaster("local[4]")
  3.   val ssc = new StreamingContext(sparkConf, Seconds(5))
  4.   val topic = "ODS_DB_GMALL0317_M"
  5.   val groupId = "ods_gmall_maxwell_group"
  6.   var inputDstream: InputDStream[ConsumerRecord[String, String]]=null
  7.   val offsetMap: Map[TopicPartition, Long] = OffsetManager.getOffset(topic,groupId )
  8.   //2、 把偏移量交给kafka ,让kafka按照偏移量的位置读取数据流
  9.   if(offsetMap!=null){
  10.     inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc,offsetMap, groupId)
  11.   }else{
  12.     inputDstream = MyKafkaUtil.getKafkaStream(topic, ssc,  groupId)
  13.   }
  14.   //3、  获得偏移量的结束位置
  15.   //从流中rdd 获得偏移量的结束点 数组
  16.   var offsetRanges: Array[OffsetRange]=null
  17.   val inputWithOffsetDstream: DStream[ConsumerRecord[String, String]] = inputDstream.transform { rdd =>
  18.     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  19.     rdd
  20.   }
  21.   val jsonObjDstream: DStream[JSONObject] = inputWithOffsetDstream.map { record =>
  22.     val jsonString: String = record.value()
  23.     JSON.parseObject(jsonString)
  24.   }
  25.   // 数据筛选 , 分流
  26.   jsonObjDstream.foreachRDD{rdd=>
  27.     rdd.foreach{jsonObj=>
  28.       val dataJson: String = jsonObj.getString("data")
  29.       val table: String = jsonObj.getString("table")
  30.       val topic="ODS_T_"+table.toUpperCase
  31.       println(topic+"::"+dataJson)
  32.       if(dataJson!=null&&dataJson.size>2&&jsonObj.getString("type")!=null&&(
  33.         (table=="order_info"&&jsonObj.getString("type")=="insert")
  34.      || (table=="order_detail"&&jsonObj.getString("type")=="insert")
  35.      || (table=="base_province"&&(jsonObj.getString("type")=="insert"||jsonObj.getString("type")=="update")||jsonObj.getString("type")=="bootstrap-insert")
  36.      || (table=="user_info"&&(jsonObj.getString("type")=="insert"||jsonObj.getString("type")=="update")||jsonObj.getString("type")=="bootstrap-insert")
  37.         )){
  38.        // Thread.sleep(200)
  39.         MyKafkaSender.send(topic,dataJson)
  40.       }
  41.     }
  42.     OffsetManager.saveOffset(topic,groupId,offsetRanges)
  43.   }
  44.   ssc.start()
  45.   ssc.awaitTermination()
  46. }</div>
复制代码

作者:扫地僧
来源:https://mp.weixin.qq.com/s/EjpC_B597IxgyGJ5YWomtQ

最新经典文章,欢迎关注公众号

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条