kafka多线程消费(同一个group)

查看数: 44056 | 评论数: 3 | 收藏 0
关灯 | 提示:支持键盘翻页<-左 右->
    组图打开中,请稍候......
发布时间: 2016-8-30 11:52

正文摘要:

最近在做kafka的多线程消费。 碰到一个奇怪的问题。 按照kafka的介绍,同一个group的消费者consumer,应该是处理不同的数据,但是我在调试程序的时候出现了以下结果: 注:    topic: FREIGHT 有 ...

回复

linjikai8888 发表于 2016-9-8 09:59:53
消费者的活跃度

作为消费组的一部分,每个消费者会被分配它订阅的topics的一部分partitions.就像在这些partitions上加了一个组锁.
只要锁被持有,组中的其他成员就不会读取他们(译注:每个partition都对应唯一的消费者,partition锁只属于唯一的消费者).
当你的消费者是正常状态时,当然是最好不过了,因为这是防止重复消费的唯一方式.
但如果消费者失败了,你需要释放掉那个锁,这样可以将partitions分配给其他健康的成员.
kafka的消费组协调协议使用心跳机制解决了这个问题.在每次rebalance,所有当前generation的成员都会定时地发送心跳给group协调者.
只要协调者持续接收到心跳,它会假设这个成员是健康的. 每次接收到心跳,协调者就开始或者重置计时器.
如果时间超过了,没有收到消费者的心跳,协调者标记消费者为死亡状态,并触发组中其他的消费者重新加入,来重新分配partitions.
计时器的时间间隔就是session timeout,即客户端应用程序中配置的session.timeout.ms
session timeout确保应用程序崩溃或者partition将消费者和协调者进行了隔离的情况下锁会被释放.
注意应用程序的失败(进程还存在)有点不同,因为消费者仍然会发送心跳给协调者,并不代表应用程序是健康的.
消费者的轮询循环被设计为解决这个问题. 所有的网络IO操作在调用poll或者其他的阻塞API,都是在前台完成的.
消费者并不使用任何的后台线程. 这就意味着消费者的心跳只有在调用poll的时候才会发送给协调者.
如果应用程序停止polling(不管是处理代码抛出异常或者下游系统崩溃了),就不会再发送心跳了,
最终就会导致session超时(没有收到心跳,计时器开始增加), 然后消费组就会开始平衡操作.
唯一存在的问题是如果消费者处理消息花费的时间比session timeout还要长,就会触发一个假的rebalance.
可以通过设置更长的session timeout防止发生这样的情况.默认的超时时间是30秒,设置为几分钟也不是不行的.
更长的session timeout的缺点是,协调者会花费较长时间才能检测到真正崩溃的消费者.

明白了!
linjikai8888 发表于 2016-8-31 08:40:41
desehawk 发表于 2016-8-30 15:58
2016-08-30 11:37:02.951  INFO 5324 ---  o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly ass ...

刚开始是没有错的。

忘了说明,后面的  log信息是代表  TOPIC-PARTITION-OFFSET

后面可以看到,另外一个线程来拿数据了,这样会拿到重复数据。

我要的理想情况应该是这个线程 什么都不做就好了。如果单线程肯定是没有问题的。
desehawk 发表于 2016-8-30 15:58:28
2016-08-30 11:37:02.951  INFO 5324 --- [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-0] for group FREIGHT
2016-08-30 11:37:02.972  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group FREIGHT with generation 2
2016-08-30 11:37:02.973  INFO 5324 --- [pool-1-thread-2] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [FREIGHT-1] for group FREIGHT

可以看下红字部分,应该是按照楼主的思路来的。
两个同组的消费者对应两个不同的分区。刚开始 partitions [FREIGHT-1] 有数据,对应- [pool-1-thread-2] 开始消费数据
partitions [FREIGHT-0] 有数据后, [pool-1-thread-1]开始消费

关闭

推荐上一条 /2 下一条