分享

Kafka权威指南 第四章第7节:查找和准确处理

feilong 2017-7-14 15:36:03 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 7906

问题导读

1.如何从任意位置读取偏移量,消费消息?
2.seek()方法如何使用?





到目前为止,我们已经看到了如何使用poll()方法从每个分区最后提交的偏移量开始消费信息和继续处理序列中的所有消息。然而,有时会希望从不同的偏移量开始读取。

如果你想从分区开始位置读取所有消息,或者你想跳过所有的到分区的结尾开始消费的新信息,这都有专门的APIs : seekToBeginning(TopicPartition tp) 和 seekToEnd(TopicPartition tp)。

但是,Kafka API也允许你查找特定的偏移量。此能力可以以多种方式使用,例如返回很少的消息或跳过几条消息(可能是时间敏感的应用程序落后了,所以要跳过更多相关的消息),但这种能力最令人兴奋的用例是偏移量被存储在Kafka以外的系统中。

考虑一下这个常见场景:你的应用程序正在读取来自Kafka的事件(可能是一个网站的用户点击流),数据处理(也许是清理由机器人程序的点击并添加会话信息)然后将结果存储在数据库中,
NoSQL存储或Hadoop。假设我们真的不想丢失任何数据,或者我们希望在数据库中存储相同的结果两次。
在这种场景消费循环代码如下:
[mw_shl_code=java,false]while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
[/mw_shl_code]
注意,我们非常偏执,所以我们在处理万每个记录后提交偏移量。然而,我们的应用程序仍然有可能在记录存储到数据库之后崩溃,除非在提交偏移量之前,记录被处理,数据库中存储了重复记录。

如果只有一种方法来存储记录和偏移量,则可以避免这种情况。在一个原子动作中。无论是记录还是偏移要么都被提交,要么都不被提交。只要记录被写入数据库,偏移量写入kafka,这是不可能的。

但是如果我们在一个事务中把记录和偏移量写入数据库,又该怎么办呢?
然后我们就会知道,无论我们处理完了记录,偏移量被提交了,或者没有,记录都将被重新处理。

现在唯一的问题是:如果记录存储在数据库中而不是存储在kafka中,那么当分配一个分区时,我们的消费者如何知道该从哪里开始读取呢?这时seek()正可用于此。当消费者启动或新分区被分配时,它可以在数据库中查找偏移量并定位到那个位置。

下面是它如何工作的一个示例。我们用consumerrebalancelister和seek()确保我们从数据库中存储的偏移量位置开始处理。
[mw_shl_code=java,false]public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}[/mw_shl_code]

有多种不同的方式可以通过将偏移量和数据存储到外部存储来实现消息只一次传输,但是所有的这些方法都需要使用ConsumerRebalance Listener和seek() 来确保偏移量及时存储,消费者从正确的位置读取消息。

本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条