分享

什么情况下,Kafka Borker之间重新分配


问题导读
1. Kafka扩容会带来哪些问题?
2. 什么情况下,Broker之间需要重新分配分区?
3. Broker 节点如何实现重新分配?

最新经典文章,欢迎关注公众号

1.文档编写目的
本文档为转载,github地址 https://github.com/fayson/cdhproject

在Kafka集群资源使用已超出系统配置的资源时,需要通过扩容Kafka节点来实现Kafka集群的资源扩容。新增的Kafka节点,只有在创建新的Topic才会参与工作,对于之前其它Broker节点上的分区是不会自动均衡的,不能达到负载的效果。这时需要在Broker之间重新分配分区,本篇文章Fayson主要介绍如何重新分配Topic的partition。

内容概述
1.环境准备及说明
2.重新分配Partition及验证
3.总结

测试环境
1.CM和CDH版本为5.15
2.Kafka版本为0.10.2+kafka2.2.0

2.环境准备及说明

1.现集群Kafka节点数量为3个
1.png


2.在Kafka集群中创建一个测试的Topic,命令如下
[mw_shl_code=bash,true]kafka-topics --create --zookeeper cdh01.fayson.com:2181 --replication-factor 2 --partitions 4 --topic test_partition
[/mw_shl_code]
2.png

该Topic为2个副本共4个Partition,通过Kafka命令查看该Topic的详细信息
[mw_shl_code=bash,true]kafka-topics --zookeeper cdh01.fayson.com:2181 --describe --topic test_partition
[/mw_shl_code]

可以看到test_partition的4个Partition分布在三个Broker上。

1.png

3.向测试test_partition生产部分测试数据
1.png
测试脚本可以在Fayson的Github上下载。
https://github.com/fayson/cdhproject/tree/master/kafkademo/0283-kafka-shell
4.为Kafka集群扩容一个新的Broker(通过CM添加一个Kafka角色并启动即可)
1.png

在Kafka的配置界面搜索“broker.id”,查看Kafka所有节点的BrokerID,在后面的Partition重新分配时会用到

1.png

5.使用Kafka命令查看Topic的详细描述

可以看到Kafka的Partition仍然分配在之前的三个Broker上并未因为新增Broker而进行自动均衡
1.png

6.创建一个新的Topic,查看Partition的分布情况

1.png

可以看到新创建的Topic,相应的Partition会分布在新的Broker节点上,对于之前的Topic的Partition不会自动均衡到新的节点上,因此之前Topic的压力还在旧的节点上,接下来Fayson会说明如何使用Kafka命令重新分配Kafka的Partition。

3.重新分配Partition
以我们上面创建的test_partition为例来说明,如何重新分配该Topic的Partition。

1.在重新分配Partition前登录新扩容的Broker节点查看数据目录
1.png
可以看到没有关于test_partition Topic的分区目录。

2.创建一个待重新分配的Topic的JSON文件,内容如下
[mw_shl_code=bash,true][root@cdh01 kafka_partition_reassignment]# vim topics-to-move.json
{
  "topics": [{
     "topic": "test_partition"
   }],
   "version": 1
}[/mw_shl_code]
这里也可以同时为多个Topic进行重新分配,在json文件中添加多个topic即可。

3.使用kafka-reassign-partitions命令生成一个分配计划,下面的命令行中broker-list参数即对应上面的BrokerID
[mw_shl_code=bash,true]kafka-reassign-partitions --zookeeper cdh01.fayson.com:2181 \
    --topics-to-move-json-file topics-to-move.json \
    --broker-list "110,111,109,190" \
    --generate[/mw_shl_code]
1.png
可以看到在命令行生成的Partition重分配的计划(这里还没有真正的去分配,只是生成了分配计划),在截图中也很清楚的标出了分配前和分配后的结果,注意这里我们需要使用到分配计划生成的结果(即重新分配后Partition的JSON字符串)。

4.将分配计划的第二个JSON保存至reassignment.json文件中
[mw_shl_code=bash,true][root@cdh01 kafka_partition_reassignment]# vim reassignment.json
{"version":1,"partitions":[{"topic":"test_partition","partition":1,"replicas":[109,111]},{"topic":"test_partition","partiti
on":0,"replicas":[190,110]},{"topic":"test_partition","partition":3,"replicas":[111,109]},{"topic":"test_partition","partit
ion":2,"replicas":[110,190]}]}[/mw_shl_code]
1.png

5.运行kafka-reassign-partition命令根据上述执行计划生成的结果进行分配,命令如下:
[mw_shl_code=bash,true]kafka-reassign-partitions \
    --zookeeper cdh01.fayson.com:2181 \
    --reassignment-json-file reassignment.json \
    --execute[/mw_shl_code]
1.png

6.对于数据量比较大的Topic运行重新分配会比较耗时,这时可以通过如下命令查看Topic的重分配情况,命令如下:
[mw_shl_code=bash,true]kafka-reassign-partitions \
    --zookeeper cdh01.fayson.com:2181 \
    --reassignment-json-file reassignment.json \
    --execute[/mw_shl_code]

1.png

通过该命令可以看到每个Partition的分配进度。

4.验证Partition分配情况

可以通过kafka-topics命令查看指定Topic的描述信息,操作如下:

1.在命令行执行如下命令,查看Topic的描述信息
[mw_shl_code=bash,true]kafka-topics --zookeeper cdh01.fayson.com:2181 --describe --topic test_partition
[/mw_shl_code]
上图可以看到Topic的Partition0和2已经重新分配到ID为190的Broker节点上了,该节点为新扩容节点。

1.png

2.登录该节点查看Kafka的数据目录

1.png

可以看到新增的Broker节点上,跟分配计划一致,将test_partition0和2的数据移动至该节点(包括Partition的数据)。

5.总结

1.在进行Kafka集群扩容后,需要考虑为原有的Topic分区进行重新分配,否则新增节点是不会负载扩容前已存在的Topic。

2.重新分配的命令中broker-list参数可以指定多个BrokerID(注意这里是ID而不是Broker的ip或则hostname)。在指定broker-list时,如果不想partition分配到某个Broker时则list中不要有对应节点的BrokerID即可。

3.kafka-reassign-partitions命令是针对Partition进行重新分配,而不能将整个Topic的数据重新均衡到所有的Partition中。


原文链接
作者:Fayson
来自公众号:Hadoop实操

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条