分享

kafka多线程消费(同一个group)

最近在做kafka的多线程消费。

碰到一个奇怪的问题。

按照kafka的介绍,同一个group的消费者consumer,应该是处理不同的数据,但是我在调试程序的时候出现了以下结果:
注:
   topic: FREIGHT 有两个分区,其中一个分区没有数据,另一个分区有数据,我开了两个线程来处理
我心目中理想的情况应该是,一个线程不断拿到数据,另一个线程无法拿到数据,不断等数据进来。

还请那位兄弟能够帮忙解惑一下。
结果:
L1WSBAS8M4KLJSJ`QNHJE`6.png

QQ截图20160830114148.png

完整log:

2016-08-30 11:37:02.642  INFO 5324 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.0.0
2016-08-30 11:37:02.642  INFO 5324 --- [pool-1-thread-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.0.0
2016-08-30 11:37:02.642  INFO 5324 --- [pool-1-thread-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : b8642491e78c5a13
2016-08-30 11:37:02.642  INFO 5324 --- [pool-1-thread-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : b8642491e78c5a13
2016-08-30 11:37:02.795  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator (id: 2147483647 rack: null) for group FREIGHT.
2016-08-30 11:37:02.795  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator  (id: 2147483647 rack: null) for group FREIGHT.
2016-08-30 11:37:02.796  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group FREIGHT
2016-08-30 11:37:02.796  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group FREIGHT
2016-08-30 11:37:02.796  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group FREIGHT
2016-08-30 11:37:02.796  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group FREIGHT
2016-08-30 11:37:02.879  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group FREIGHT
2016-08-30 11:37:02.948  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group FREIGHT with generation 2
2016-08-30 11:37:02.951  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-0] for group FREIGHT
2016-08-30 11:37:02.972  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group FREIGHT with generation 2
2016-08-30 11:37:02.973  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-1] for group FREIGHT
2016-08-30 11:37:08.108  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----0
2016-08-30 11:37:10.411  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----1
2016-08-30 11:37:11.259  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----2
2016-08-30 11:37:11.794  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----3
2016-08-30 11:37:12.187  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----4
2016-08-30 11:37:12.561  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----5
2016-08-30 11:37:12.985  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----6
2016-08-30 11:37:25.634  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----7
2016-08-30 11:37:26.786  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----8
2016-08-30 11:37:27.490  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----9
2016-08-30 11:37:28.111  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----10
2016-08-30 11:37:29.487  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----11
2016-08-30 11:37:29.913  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----12
2016-08-30 11:37:30.317  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----13
2016-08-30 11:37:31.890  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----14
2016-08-30 11:37:32.419  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----15
2016-08-30 11:37:32.890  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----16
2016-08-30 11:37:33.306  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----17
2016-08-30 11:37:33.739  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----18
2016-08-30 11:37:34.200  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----19
2016-08-30 11:37:35.315  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----20
2016-08-30 11:37:35.762  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----21
2016-08-30 11:37:35.994  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [FREIGHT-0] for group FREIGHT
2016-08-30 11:37:35.994  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group FREIGHT
2016-08-30 11:37:36.221  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----22
2016-08-30 11:37:36.426  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group FREIGHT with generation 3
2016-08-30 11:37:36.427  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-1, FREIGHT-0] for group FREIGHT
2016-08-30 11:37:36.649  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----23
2016-08-30 11:37:37.064  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----24
2016-08-30 11:37:37.520  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----25
2016-08-30 11:37:39.409  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----26
2016-08-30 11:37:40.162  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----27
2016-08-30 11:37:41.673  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----28
2016-08-30 11:37:42.633  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----29
2016-08-30 11:37:43.483  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----30
2016-08-30 11:37:44.490  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----31
2016-08-30 11:37:46.713  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----32
2016-08-30 11:37:47.560  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----33
2016-08-30 11:37:50.578  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----34
2016-08-30 11:37:51.850  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----35
2016-08-30 11:37:52.562  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----36
2016-08-30 11:37:53.178  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----37
2016-08-30 11:37:53.714  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----0
2016-08-30 11:37:53.976  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----38
2016-08-30 11:37:54.010  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----1
2016-08-30 11:37:54.971  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----39
2016-08-30 11:37:55.307  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----2
2016-08-30 11:37:57.147  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----40
2016-08-30 11:37:57.456  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----3
2016-08-30 11:37:57.878  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----41
2016-08-30 11:37:58.115  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----4
2016-08-30 11:37:58.504  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----42
2016-08-30 11:37:58.911  INFO 5324 --- [pool-1-thread-2] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----43
2016-08-30 11:37:59.568  INFO 5324 --- [pool-1-thread-1] c.x.trace.storage.HbaseStorageService    : FREIGHT-----1-----5


已有(3)人评论

跳转到指定楼层
desehawk 发表于 2016-8-30 15:58:28
2016-08-30 11:37:02.951  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-0] for group FREIGHT
2016-08-30 11:37:02.972  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group FREIGHT with generation 2
2016-08-30 11:37:02.973  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-1] for group FREIGHT

可以看下红字部分,应该是按照楼主的思路来的。
两个同组的消费者对应两个不同的分区。刚开始 partitions [FREIGHT-1] 有数据,对应- [pool-1-thread-2] 开始消费数据
partitions [FREIGHT-0] 有数据后, [pool-1-thread-1]开始消费

回复

使用道具 举报

linjikai8888 发表于 2016-8-31 08:40:41
desehawk 发表于 2016-8-30 15:58
2016-08-30 11:37:02.951  INFO 5324 ---  o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly ass ...

刚开始是没有错的。

忘了说明,后面的  log信息是代表  TOPIC-PARTITION-OFFSET

后面可以看到,另外一个线程来拿数据了,这样会拿到重复数据。

我要的理想情况应该是这个线程 什么都不做就好了。如果单线程肯定是没有问题的。
回复

使用道具 举报

linjikai8888 发表于 2016-9-8 09:59:53
消费者的活跃度

作为消费组的一部分,每个消费者会被分配它订阅的topics的一部分partitions.就像在这些partitions上加了一个组锁.
只要锁被持有,组中的其他成员就不会读取他们(译注:每个partition都对应唯一的消费者,partition锁只属于唯一的消费者).
当你的消费者是正常状态时,当然是最好不过了,因为这是防止重复消费的唯一方式.
但如果消费者失败了,你需要释放掉那个锁,这样可以将partitions分配给其他健康的成员.
kafka的消费组协调协议使用心跳机制解决了这个问题.在每次rebalance,所有当前generation的成员都会定时地发送心跳给group协调者.
只要协调者持续接收到心跳,它会假设这个成员是健康的. 每次接收到心跳,协调者就开始或者重置计时器.
如果时间超过了,没有收到消费者的心跳,协调者标记消费者为死亡状态,并触发组中其他的消费者重新加入,来重新分配partitions.
计时器的时间间隔就是session timeout,即客户端应用程序中配置的session.timeout.ms
session timeout确保应用程序崩溃或者partition将消费者和协调者进行了隔离的情况下锁会被释放.
注意应用程序的失败(进程还存在)有点不同,因为消费者仍然会发送心跳给协调者,并不代表应用程序是健康的.
消费者的轮询循环被设计为解决这个问题. 所有的网络IO操作在调用poll或者其他的阻塞API,都是在前台完成的.
消费者并不使用任何的后台线程. 这就意味着消费者的心跳只有在调用poll的时候才会发送给协调者.
如果应用程序停止polling(不管是处理代码抛出异常或者下游系统崩溃了),就不会再发送心跳了,
最终就会导致session超时(没有收到心跳,计时器开始增加), 然后消费组就会开始平衡操作.
唯一存在的问题是如果消费者处理消息花费的时间比session timeout还要长,就会触发一个假的rebalance.
可以通过设置更长的session timeout防止发生这样的情况.默认的超时时间是30秒,设置为几分钟也不是不行的.
更长的session timeout的缺点是,协调者会花费较长时间才能检测到真正崩溃的消费者.

明白了!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条