分享

kafka权威指南 第四章第2、3节 创建kafka消费者并订阅Topics

levycui 2017-6-20 16:59:40 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 12029
问题导读:
1、kafka如何创建kafka消费者?
2、kafka创建消费者重要属性有哪些?
3、kafka如何订阅Topics?




创建kafka消费者
开始消费的第一步是创建一个KafkaConsumer实例,创建一个KafkaConsumer非常类似于创建一个KafkaProducer,你可以创建一个Java属性实例,并使用你希望传递给使用者的属性。我们将在后面的章节中详细讨论所有的属性。首先,我们只需要使用3个属性:bootstrap.servers, key.deserializer 和 value.deserializer.

第一个属性,bootstrap.servers是到kafka集群的连接字符串,它的使用方式和它在KafkaProducer使用方式是一样的,你可以参考第三章来了解具体的细节,另外两个属性key.deserializer和value.deserializer,类似于为生产者定义的序列化器,而不是指定将Java对象转换为ByteArray类,你需要指定使用ByteArray并将其转换为Java对象的类。

还有第四个属性,不是强制性的,但是现在我们要假装它是。这个属性是group.id和它指定了消费者组Kafka‐Consumer实例。虽然有可能创建不属于任何消费者组的消费者,这种情况不那么常见,在本章的大部分内容中我们假设消费者是组的一部分。

下面的代码片段展示了如何创建一个KafkaConsumer:
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);[/mw_shl_code]
如果你阅读了关于创建生产者的第三章,你所看到的大部分内容应该是非常熟悉的。我们计划使用字符串作为键和值,因此,我们使用内置的StringDeserializer,我们创建了带有字符串类型的KafkaConsumer。

这里唯一的新属性是group.id - 这是消费者组的名字,这是消费者的一部分。

订阅Topics
一旦我们创建了一个消费者,下一步就是订阅一个或多个Topics。subcribe()方法将Topics列表作为参数,所以使用起来很简单:
[mw_shl_code=java,true]consumer.subscribe(Collections.singletonList("customerCountries"));[/mw_shl_code]
这里我们简单地创建一个列表,其中有一个元素,Topics名“customer‐Countries”,也可以使用正则表达式调用订阅,这个表达式可以匹配多个Topics名称,如果有人创建了一个匹配的新Topics,重新平衡将几乎立即发生并且消费者将开始从新的Topics开始消费。这对于需要从多个Topics中使用的应用程序非常有用,可以处理Topics将包含的不同类型的数据。在kafka和另一个系统之间复制数据的应用程序中最常见。

要订阅所有测试的Topics,我们可以调用:
[mw_shl_code=java,true]consumer.subscribe("test.*");[/mw_shl_code]

原文:
Creating a Kafka Consumer
The first step to start consuming records is to create a KafkaConsumer instance. Creating a KafkaConsumer is very similar to creating a KafkaProducer - you create a Java Properties instance with the properties you want to pass to the consumer. We will discuss all the properties in depth later in the chapter. To start we just need to use the 3 mandatory properties: bootstrap.servers, key.deserializer and value.deserializer.

The first property, bootstrap.servers is the connection string to Kafka cluster. It is used the exact same way it is used in KafkaProducer, and you can refer to Chapter 3 to see specific details on how this is defined. The other two properties key.deserializer and value.deserializer are similar to the serializers defined for the producer,but rather than specifying classes that turn Java objects to a ByteArray, you need to specify classes that can take a ByteArray and turn it into a Java object.

There is a fourth property, which is not strictly mandatory, but for now we will pretend it is. The property is group.id and it specifies the Consumer Group the Kafka‐Consumer instance belongs to. While it is possible to create consumers that do not belong to any consumer group, this is far less common and for most of the chapter we will assume the consumer is part of a group.

The following code snippet shows how to create a KafkaConsumer:
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);[/mw_shl_code]
Most of what you see here should be very familiar if you’ve read Chapter 3 on creating producers. We are planning on consuming Strings as both key and value, so we use the built-in StringDeserializer and we create KafkaConsumer with String types.

The only new property here is group.id - which is the name of the consumer group this consumer will be part of.

Subscribing to Topics
Once we created a consumer, the next step is to subscribe to one or more topics. The subcribe() method takes a list of topics as a parameter, so its pretty simple to use:
[mw_shl_code=java,true]consumer.subscribe(Collections.singletonList("customerCountries"));[/mw_shl_code]
Here we simply create a list with a single element, the topic name “customer‐Countries”It is also possible to call subscribe with a regular expression. The expression can match multiple topic names and if someone creates a new topic with a name that matches, a rebalance will happen almost immediately and the consumers will start consuming from the new topic. This is useful for applications that need to consume from multiple topics and can handle the different types of data the topics will contain. It is most common in applications that replicate data between Kafka and another system.

To subscribe to all test topics, we can call:
[mw_shl_code=java,true]consumer.subscribe("test.*");[/mw_shl_code]

本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条