分享

求助大神! kafaka无法消费问题

本帖最后由 gdz911 于 2017-7-2 22:25 编辑

自己学习 做了个 log4j-flume-kafka 发现linux(在虚拟机上)上用自带的可消费javaDemo里面的log4信息,通过kafka-clients 却无法消费
kafka-clients 版本0.11.0.0
flume配置如下
5.png

发送给flume代码
[mw_shl_code=java,true]public class Demo {

        protected static final Logger logger = LoggerFactory.getLogger(Demo.class);

        public static void main(String[] args) throws Exception {

                while (true) {
                        logger.info(String.valueOf(new Date().getTime()) + "---" + "我是INFO");
                        Thread.sleep(3000);

                }

        }

}[/mw_shl_code]

kafka控制台显示
4.png

消费者代码
[mw_shl_code=java,true]        @Test
        public void kafkaConsumerDemo() {
                Properties props = new Properties();
                props.put("bootstrap.servers", "192.168.229.131:9092");
                props.put("enable.auto.commit", "true");
                props.put("auto.commit.interval.ms", "1000");
                props.put("session.timeout.ms", "30000");
                props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
                consumer.subscribe(Arrays.asList("test"));
                System.out.println("----------");
                for (ConsumerRecord<String, String> record : consumer.poll(1000)){
                //System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());               
                                System.out.printf("xxxx");                                    
                        }
               
        }[/mw_shl_code]

控制台信息
3.png
2.png
最后控制台显示了 消费的错误信息


1.png

已有(1)人评论

跳转到指定楼层
gdz911 发表于 2017-7-2 22:53:00
已经解决了 kafka配置文件 加入host.name=ip  即可 不然消费者指向localhost
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条