分享

kafka权威指南 第四章 第5节:提交和偏移量

本帖最后由 feilong 于 2017-6-30 08:16 编辑
问题导读

1.什么是一次提交(commit)?
2.什么是偏移量(offset)?
3.消费者如何提交偏移量?
4.KafkaConsumer API 提供的提交偏移量的方法有哪些?
5.commitSync()方法如何使用?






当我们调用poll()时,它会返回我们消费组中的消费者还未消费的写入kafka的记录。这意味着,我们有一种方法可以跟踪用户组读取的记录。正如我们之前所讨论过的,kafka的一个独特之处在于它不像许多JMS队列那样跟踪消费者的确认。相反,它允许消费者使用kafka跟踪它们在每个分区中的位置(偏移量)。

我们称在分区中更新当前位置的动作为提交(commit)。

消费者如何提交偏移量?它产生一个消息给kafka,同时会有一个特别的主题__consumer_offsets,用来记录每个分区已提交的偏移量。无论消费者启动,运行,离开,对它都不会产生什么影响。然而,如果消费者宕掉或新的消费者加入消费组,这将触发负载均衡。负载均衡后,每个消费者都可能被分配一组新的分区,而不是以前处理的分区。为了是系统正常运转,消费者会从每个分区最新提交的偏移量处继续读取、处理。

如果所提交的偏移量小于客户端处理的最后一个消息的偏移量,则最后处理的偏移量和提交的偏移量之间的消息将被处理两次。
图片1.png
如果所提交的偏移量大于客户端实际处理的最后一个消息的偏移量,则最后处理的偏移量和所提交的偏移量之间的所有消息将被消费组忽略。
图片2.png
清楚地管理偏移量对客户端应用程序有很大的影响。

KafkaConsumer API提供了多种提交偏移量的方法。


Automatic Commit 自动提交

提交偏移量好的方法就是让消费者为你工作。如果配置了enable.auto.commit = true,消费者将每5秒提交客户端从poll()方法返回的最大偏移量。5秒的间隔是默认的,可通过配置auto.commit.interval.ms来实现控制。与消费者中的其他内容一样,自动提交是由轮询循环驱动的。当轮询时,消费者检查是否提交,如果是的话,它将提交上次轮询中返回的偏移量。

然而,在使用这个方便的选项之前,了解其后果是很重要的。

考虑到自动提交每5秒发生一次,假设最近一次提交3秒后负载均衡触发,负载均衡后所有的消费者都将从最后的偏移量进行提交。在这种情况下,偏移量的生命周期是3秒,所有在这3秒内到达的事件将被处理两次。可以将提交间隔配置为更频繁地提交,并减少将重复记录的窗口,但不可能完全消除它们。
注意,启用自动提交后,对轮询的调用将始终执行上次轮询返回的最后偏移量。它不知道哪些事件实际上已被处理,所以在再次调用轮询之前,始终处理轮询返回的所有事件是至关重要的(或在调用close()之前,它也将自动提交偏移)。这通常不是问题,但在处理异常或提前退出轮询循环时要注意。
自动提交很方便,但它们没有给开发人员足够的控制权,避免重复消息。


Commit Current Offset 提交当前偏移量

大多数开发人员使用更多的的是控制偏移量提交的时间。既要消除丢失消息的可能性,又要减少负载均衡过程中消息重复的数量。消费者API可以选择在对应用程序开发人员来说有意义的点上提交当前偏移量,而不是基于定时器。

通过设置auto.commit.offset = false,只有当应用程序明确选择这样做时,才会提交偏移量。最简单和最可靠的提交API是commitsync()。这个API将提交方法poll()返回的最新偏移量,并且一旦偏移量被提交就会被返回,如果因一些原因提交失败则会抛出异常。

重要的是要记住,commitSync()将提交poll()返回的最新偏移量,所以调用commitSync(),之前要确保已经集合中所有的记录已处理完成,否则你将面临上述丢数据的风险。注意,当负载平衡被触发,从最新批次开始直到负载平衡时间内的所有消息将被处理两次。

这是一个 当我们处理完最新批次的消息后如何调用commitSync提交偏移量的示例。
[mw_shl_code=java,false]while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = %s, partition = %s, offset = %d, customer =
%s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());//
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}[/mw_shl_code]


(未完待续...)

本帖被以下淘专辑推荐:

已有(3)人评论

跳转到指定楼层
lynglyn 发表于 2017-6-30 11:17:14
工作年限:三年以上
学历要求:本科
期望层级:P7
岗位描述:
1)从事前沿人工智能的新一代风控智能引擎搭建,为互联网金融各类传统、创新业务保驾护航;
2)运用海量数据,以数据为驱动,提供实时、智能、个性化的风控解决方案,以满足高性能、可扩展、高可靠的要求;
3)深入理解风控领域的业务,实际参与并主导风控智能引擎的规划、架构、设计和研发,能够推动智能引擎的落地实施,并保证项目进度和质量要求。
岗位要求:
1)计算机、电子信息工程等相关专业背景,本科及以上学历,3年以上Java研发经验;
2)熟悉spark等大数据相关技术,并至少具备2年以上大数据应用开发经验,对机器学习有经验者优先;
3)有扎实的Java基础, 熟练掌握jvm机制、多线程、常用容器、反射等基础知识;
4)熟练掌握Spring、Ibatis、缓存、消息等主流JAVA框架及原理,有分布式系统或者实时计算引擎的设计及项目经验,并能完成相应系统的设计和研发;
5)具有良好的学习能力、沟通技能和团队合作能力,热爱技术,有责任心。

工作地:成都
联系邮箱:shijie.gsj@alibaba-inc.com
有意愿的同学,发简历到邮箱,谢谢!
回复

使用道具 举报

redhat1986 发表于 2017-7-2 11:47:26
Properties props = new Properties();
             props.put("bootstrap.servers", "node1:9092");
             props.put("group.id", "tests");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("session.timeout.ms", "30000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList("tests"));
             while (true) {
                 ConsumerRecords<String, String> records = consumer.poll(100);
                 for (ConsumerRecord<String, String> record : records)
                     System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
             }

一直读不到数据是什么情况?写的生产者可以往kafka里写数据,但是消费不了数据,代码不报错。请大家分析分析原因。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条