分享

kafka api怎么保证不重复发数据

shfshihuafeng 发表于 2017-4-20 14:53:52 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 6995

我调用这个方法 从kafka取到重复数据,怎么保证不消费重复数据


private void putDataByKafka(Properties props, ClientHandler handler) {

                // 获取kafka数据
                System.out.println("从kafka获取数据");
                int i = 1;

                String topic = props.getProperty("topic.name");
               
                ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

                Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(props.getProperty("topic.name"), 1); // 一次从主题中获取一个数据
                Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
                ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

                while (iterator.hasNext()) {
                        System.out.println(handler.toString()+":开始消费");
                        MessageAndMetadata<byte[], byte[]> messagemeta = iterator.next();
               
                        long offset = messagemeta.offset();
                        int partition=messagemeta.partition();
                        String message = new String(messagemeta.message());
                       
                        // hostName+";"+ip+";"+commandName+";"+res+";"+System.currentTimeMillis();
                        // 这里指的注意,如果没有下面这个语句的执行很有可能回从头来读消息的
                        try {
                                handler.sendClientMsg(offset + ":" + message);
                                System.out.println(handler.toString()+":send data 第" + i + "条:" + "消息:" + message+ "分区:"+partition+ "偏移:"+offset);
                               
                                consumer.commitOffsets();
                                i++;
                        } catch (IOException e) {
                               
                                long now = System.currentTimeMillis();
                                Map<TopicAndPartition, OffsetAndMetadata> tpAndOffsetMetadata = new HashMap<TopicAndPartition, OffsetAndMetadata>();
                                tpAndOffsetMetadata.put(
                                                new TopicAndPartition(messagemeta.topic(), messagemeta.partition()),
                                new OffsetAndMetadata(
                                                new OffsetMetadata(messagemeta.offset(), UUID.randomUUID().toString()),
                                                -1L,-1L)
                                );               
                                System.out.println("异常send data 第" + i + "条:" + offset + ":" + message+ "分区:"+partition);
                               
                                consumer.commitOffsets(tpAndOffsetMetadata, true);
                               
                                consumer.shutdown();
                                e.printStackTrace();
                               
                                return;
                        }
                }
               
                consumer.shutdown();
        }

已有(3)人评论

跳转到指定楼层
2017 发表于 2017-4-20 15:24:41
如果正常来讲,一般不会重复。除非异常。
容错里面可以在改改
回复

使用道具 举报

shfshihuafeng 发表于 2017-4-20 16:01:56
怎么改呀,kafka没有异常

回复

使用道具 举报

arsenduan 发表于 2017-4-20 16:05:52
本帖最后由 arsenduan 于 2017-4-20 16:08 编辑
shfshihuafeng 发表于 2017-4-20 16:01
怎么改呀,kafka没有异常

楼主怎么重复了,需要知道什么情况下重复。这个是比较重要的。kafka应该只有丢失的可能性。重复的可能性很小。
当然就如楼主所说有重复的。还有种比较笨的方法,就是前一个消息,跟后一个消息比较,如果相同就放弃。当然这个也不是好的解决办法。关键还是找到为啥会重复



回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条