分享

kafka权威指南 第四章第4节 轮询循环

levycui 2017-7-18 14:16:13 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10099
本帖最后由 levycui 于 2017-7-18 14:21 编辑
问题导读:
1、轮询循环处理哪些工作?
2、消费者主体关注哪些内容?
3、consumer如何确认生存状态?
4、同一组中如何运行多个消费者?



轮询循环
consumer API的核心是一个简单的循环,用于轮询服务器以获取更多的数据。
一旦使用者订阅topic,轮询循环就会处理协调、分区重新平衡、心跳和数据获取的所有细节,从而使开发人员拥有一个干净的API,该API可以从指定的分区返回可用的数据。消费者的主体会关注如下:
[mw_shl_code=java,true]try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
        log.debug("topic = %s, partition = %s, offset = %d, customer = %s,
        country = %s\n",
       record.topic(), record.partition(), record.offset(), record.key(),
       record.value());
       int updatedCount = 1;
        if (custCountryMap.countainsValue(record.value())) {
            updatedCount = custCountryMap.get(record.value()) + 1;
        }
        custCountryMap.put(record.value(), updatedCount)
        JSONObject json = new JSONObject(custCountryMap);
        System.out.println(json.toString(4))
       }
    }
} finally {
    consumer.close();
}
[/mw_shl_code]
1、这确实是一个无限循环。consumer通常是一个长期运行的应用程序,它不断地对kafka进行检查以获取更多的数据。我们将在后面的章节中展示如何清晰地退出循环并关闭使用者。

2、这是本章中最重要的一行。鲨鱼必须不断移动,否则就会死亡,同样的道理,consumer必须继续对kafka进行轮询,否则他们将被视为死亡,而他们所消费的分区将被移交给另一个consumer,以继续消费。

3、poll()返回一个记录列表。每条记录都包含记录的Topic和分区,以及分区内记录的偏移量,当然还有记录的键值和值。通常,我们希望对列表进行迭代,并单独处理记录。poll()方法接受一个超时参数。它指定了需要多长时间进行轮询返回,无论是否有数据。该值通常由应用程序的快速响应驱动——您希望将控制返回到执行轮询的线程的速度有多快?

4、处理通常以在数据存储中写入结果或更新存储记录而结束。在这里,我们的目标是保持每个分区的消费者数量,所以我们更新一个哈希表,并将结果打印为JSON。一个更实际的例子是将更新结果存储在一个数据存储中。

5、在退出前总是close() consumer。这将关闭网络连接的套接字并将立即触发一个平衡而不是等待组协调器发现消费者停止了心跳,很可能已经死了, 这将花费更长的时间,因此会导致更长的一段时间,在此期间,没有人会从分区的子集中接收消息。

轮询循环所做的不仅仅是获取数据。第一次使用consumer调用poll()时,它负责查找GroupCoordinator,加入消费者组并接受一个分区分配。如果触发了再平衡,那么它也将在轮询循环中进行处理。当然,让消费者存活的心跳是在投票循环中发送的。出于这个原因,我们试图确保在迭代过程中所做的任何处理都是快速有效的。请注意,您不能在一个线程中拥有多个属于同一组的多个消费者,并且你不能让多个线程安全地使用相同的消费者。每个线程有一个使用者是一种规则。

要在同一个应用程序中运行同一组中的多个消费者,您需要在自己的线程中运行每个用户。将消费者逻辑包装在自己的对象中是很有用的,然后使用Java的ExecutorService来启动多个线程,每个线程都有自己的客户。Confluent blog有一个教程(https://www.confluent.io/blog/tu ... -9-consumer-client/),展示了如何做到这一点。

原文:
The Poll Loop
At the heart of the consumer API is a simple loop for polling the server for more data.

Once the consumer subscribes to topics, the poll loop handles all details of coordination, partition rebalances, heartbeats and data fetching, leaving the developer with a clean API that simply returns available data from the assigned partitions. The main body of a consumer will look at follows:

[mw_shl_code=java,true]try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
        log.debug("topic = %s, partition = %s, offset = %d, customer = %s,
        country = %s\n",
       record.topic(), record.partition(), record.offset(), record.key(),
       record.value());
       int updatedCount = 1;
        if (custCountryMap.countainsValue(record.value())) {
            updatedCount = custCountryMap.get(record.value()) + 1;
        }
        custCountryMap.put(record.value(), updatedCount)
        JSONObject json = new JSONObject(custCountryMap);
        System.out.println(json.toString(4))
       }
    }
} finally {
    consumer.close();
}
[/mw_shl_code]

1、This is indeed an infinite loop. Consumers are usually a long-running application that continuously polls Kafka for more data. We will show later in the chapter how to cleanly exit the loop and close the consumer.

2、This is the most important line in the chapter. The same way that sharks must keep moving or they die, consumers must keep polling Kafka or they will be considered dead and the partitions they are consuming will be handed to another consumer in the group to continue consuming.

3、poll() returns a list of records. Each record contains the topic and partition the record came from, the offset of the record within the partition, and of course the key and the value of the record. Typically we want to iterate over the list and process the records individually. poll() method takes a timeout parameter. This specifies how long it will take poll to return, with or without data. The value is typically driven by application needs for quick responses - how fast do you want to return control to the thread that does the polling?

4、Processing usually ends in writing a result in a data store or updating a stored record. Here, the goal is to keep a running count of customers from each county,so we update a hashtable and print the result as JSON. A more realistic example would store the updates result in a data store.

5、Always close() the consumer before exiting. This will close the network connections and the sockets and will trigger a rebalance immediately rather than wait for the Group Coordinator to discover that the consumer stopped sending heartbeats and is likely dead, which will take longer and therefore result in a longer period of time during which no one consumes messages from a subset of the partitions.

The poll loop does a lot more than just get data. The first time you call poll() with a new consumer, it is responsible for finding the GroupCoordinator, joining the consumer group and receiving a partition assignment. If a rebalance is triggered, it will be handled inside the poll loop as well. And of course the heartbeats that keep consumers alive are sent from within the poll loop. For this reason, we try to make sure that whatever processing we do between iterations is fast and efficient.Note that you can’t have multiple consumers that belong to the same group in one thread and you can’t have multiple threads safely use the same consumer. One consumer per thread is the rule.

To run multiple consumers in the same group in one application, you will need to run each in its own thread. It is useful to wrap the consumer logic in its own object, and then use Java’s ExecutorService to start multiple threads each with its own consumer. Confluent blog has a tutorial that shows how to do just that.

本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条