分享

使用0.9版Kafka的KafkaConsumer来消费kafka数据,但消费不成功

zstu 发表于 2017-5-9 17:05:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 19 46714

我用下面的代码来消费kafka的数据,但一直停在ConsumerRecords<String, String> records = consumer.poll(100);这一行。没有消费数据,也不报错。用spark streaming来消费这个topoic的数据是可以消费的。
[mw_shl_code=java,true]import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }
    }
}[/mw_shl_code]

已有(19)人评论

跳转到指定楼层
einhep 发表于 2017-5-9 17:42:12
本帖最后由 einhep 于 2017-5-9 17:45 编辑

props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.servers", "10.11.105.111:9092");
不需要所有的都指定
并且是有数据的话,就消费,没有数据的话就等100sms


回复

使用道具 举报

zstu 发表于 2017-5-10 08:54:40
einhep 发表于 2017-5-9 17:42
props.put("bootstrap.servers", "10.11.105.111:9092,10.11.105.112:9092");改为 props.put("bootstrap.s ...

bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。
回复

使用道具 举报

nextuser 发表于 2017-5-10 09:47:56
zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。

生产者代码贴出来看下
回复

使用道具 举报

NEOGX 发表于 2017-5-10 09:59:38
zstu 发表于 2017-5-10 08:54
bootstrap.servers改过来了,并且topic中有数据。但还是消费不了。

0.9不稳定,好像存在这么个现象,楼主试试1.0版本的
回复

使用道具 举报

zstu 发表于 2017-5-10 10:02:28
nextuser 发表于 2017-5-10 09:47
生产者代码贴出来看下

生产数据是别的公司给生产的, 我这没有,但我用spark streaming就能消费。
回复

使用道具 举报

zstu 发表于 2017-5-10 10:13:54
本帖最后由 zstu 于 2017-5-10 10:15 编辑
NEOGX 发表于 2017-5-10 09:59
0.9不稳定,好像存在这么个现象,楼主试试1.0版本的

集群部署不是0.10版的 ,我用了kafka-client-0.10的包后,报错了Error reading field 'brokers': Error reading field 'host': Error reading string of length 12601, only 210 bytes available 查了一下是程序的kafka版本跟集群的版本不一致
回复

使用道具 举报

zxmit 发表于 2017-5-10 17:56:01
用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下
回复

使用道具 举报

zstu 发表于 2017-5-10 19:35:42
zxmit 发表于 2017-5-10 17:56
用kafka-console-consumer脚本能够消费么。还有楼主把日志给贴一下

kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
>>>>>>>>>>>>> 开始消费数据


日志就一直挂在这不动。


我在consumer.poll(100)这一行上面加了一句System.out.println(">>>>>>>>>>>>> 开始消费数据");
回复

使用道具 举报

einhep 发表于 2017-5-10 20:04:46
zstu 发表于 2017-5-10 19:35
kafka-console-consumer能消费 。
日志:
log4j:WARN No appenders could be found for logger(org.apa ...

在后面也加上一句,看执行到哪
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条