分享

关于Kafka Consumer 无法完整获取topic消息的问题

chimes298 发表于 2015-10-10 16:41:36 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 3 44063
写了一个简单的kafka producer发送消息,consumer消费的例子
Producer程序如下:
[mw_shl_code=java,true]public class KafkaProducer {
        public static void main(String[] args) throws Exception {

                int events = 1000;

                // 设置配置属性
                Properties props = new Properties();
                props.put("metadata.broker.list",
                                "localhost:9092");
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                // props.put("partitioner.class",
                // "com.catt.kafka.demo.PartitionerDemo");
                // 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
                // 值为0,1,-1,可以参考
                // http://kafka.apache.org/08/configuration.html
                props.put("request.required.acks", "1");
                ProducerConfig config = new ProducerConfig(props);

                // 创建producer
                Producer<String, String> producer = new Producer<String, String>(config);
                // 产生并发送消息

                for (int i = 0; i < events; i++) {

                        String ts = String.valueOf(System.currentTimeMillis());

                        String key1 =  ""+i;
                        String msg1 = ts;
                       
                        KeyedMessage<String, String> data = new KeyedMessage<String, String>(
                                        "topic1", key1, msg1);
                        producer.send(data);
                        System.out.println(i+"\t"+msg1);
                        Thread.sleep(2000);
                }
                // 关闭producer
                producer.close();
        }
}[/mw_shl_code]

Consumer程序如下:
[mw_shl_code=java,true]public class KafkaConsumer {
        public static void main(String[] args) {  
            // specify some consumer properties  
                Properties props = new Properties();  
                props.put("zookeeper.connect", "localhost:2181");  
                props.put("zookeeper.connectiontimeout.ms", "1000000");  
                props.put("group.id", "test_group");  

                // Create the connection to the cluster  
                ConsumerConfig consumerConfig = new ConsumerConfig(props);  
                ConsumerConnector connector = Consumer.createJavaConsumerConnector(consumerConfig);  

                //选择topic
            Map<String,Integer> topics = new HashMap<String,Integer>();  
            // create 2 partitions of the stream for topic “test-topic”, to allow 2 threads to consume
            topics.put("topic1", new Integer(1));  
            Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = connector.createMessageStreams(topics);  
            List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("topic1");
            
            //产生两个threads
            ExecutorService threadPool = Executors.newFixedThreadPool(1);  
            //消费消息
            for (final KafkaStream<byte[], byte[]> stream : streams) {  
                    threadPool.submit(new Runnable() {  
                public void run() {  
                    ConsumerIterator<byte[], byte[]> it = stream.iterator();
                    while (it.hasNext())
                        System.out.println("topic: " + new String(it.next().topic()) +
                                                           "\t key:" + new String(it.next().key()) +
                                                           "\t content:"  + new String(it.next().message()));
                }  
            });  
        }   
        }
}[/mw_shl_code]


在Producer中,消息的key是按照i递增的,打印出来如下图显示:

producer

producer

但是在Consumer中接收到的消息全部是完整的:

consumer

consumer


求教为什么consumer都是隔着3个读取的?我在broker保存的文件中看到topic1-0的数据是完整的,说明发布成功了,但是为什么订阅不完整?

已有(3)人评论

跳转到指定楼层
leo_1989 发表于 2015-10-10 19:08:16
是不是只接收了一个Partition的
回复

使用道具 举报

chimes298 发表于 2015-10-12 18:27:06
本帖最后由 chimes298 于 2015-10-12 18:29 编辑
leo_1989 发表于 2015-10-10 19:08
是不是只接收了一个Partition的

搞明白了,是我智商捉鸡了。。。汗:

[mw_shl_code=java,true]System.out.println("topic: " + new String(it.next().topic()) + "\t key:" + new String(it.next().key()) +"\t content:"  + new String(it.next().message()));
[/mw_shl_code]

这一句中,连用了三个it.next()。。。

回复

使用道具 举报

leo_1989 发表于 2015-10-13 11:53:10
chimes298 发表于 2015-10-12 18:27
搞明白了,是我智商捉鸡了。。。汗:

[mw_shl_code=java,true]System.out.println("topic: " + new St ...

it.next(),连续三个就跳了三个
这里还真不太好处理,不太好抽象
new String(it.next().topic())
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条