分享

spark createDirectStream保存kafka offset

sinv2015 发表于 2017-10-17 19:13:48 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 8 14485
网上要不然是scala实现,要不然就是转载别人的java实现(中间实现有错误),有没有大佬会这个的,求指点

已有(8)人评论

跳转到指定楼层
easthome001 发表于 2017-10-17 19:20:43
本来就两个版本,而且环境不同,可能有所不同的,关键是理解思路,然后自己去实现。需要描述你遇到什么问题。
回复

使用道具 举报

sinv2015 发表于 2017-10-17 19:26:39
easthome001 发表于 2017-10-17 19:20
本来就两个版本,而且环境不同,可能有所不同的,关键是理解思路,然后自己去实现。需要描述你遇到什么问题 ...

一种是用checkpoint来解决,但是我遇到了一个很奇怪的问题,我自己写了一个给kafka输入数据的程序,先发100条,这时打开sparkstreaming来接收,收到100条。然后关闭sparkstreaming(模拟spark意外关闭),然后再给kafka输入120条,打开sparkstreaming,这里会奇怪的出现,先收到了120条数据,然后又收到了上次已经读过的100条数据。
第二种方法是自己写入zookeeper中,我仿照写的时候写到了这里,如下图:
QQ图片20171017192559.png
提示为不能访问 scala.Predef.$less$colon$less,解决不了。




回复

使用道具 举报

sstutu 发表于 2017-10-17 20:35:00
sinv2015 发表于 2017-10-17 19:26
一种是用checkpoint来解决,但是我遇到了一个很奇怪的问题,我自己写了一个给kafka输入数据的程序,先发1 ...

第一个,spark streaming有防止数据丢失机制,所以你重启,数据又发送了,说明spark streaming做的非常好。同时说明,你的关闭和开启的时间不到超时的时间。

第二个,是不是你执行的用户不对,导致不能访问。
这里推荐篇文章,希望有所帮助
将 Spark Streaming + Kafka direct 的 offset 存入Zookeeper并重用
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20244


回复

使用道具 举报

sinv2015 发表于 2017-10-17 20:42:08
sstutu 发表于 2017-10-17 20:35
第一个,spark streaming有防止数据丢失机制,所以你重启,数据又发送了,说明spark streaming做的非常好 ...

我看了你说那个,我得通过java实现
回复

使用道具 举报

sinv2015 发表于 2017-10-17 20:47:30
sstutu 发表于 2017-10-17 20:35
第一个,spark streaming有防止数据丢失机制,所以你重启,数据又发送了,说明spark streaming做的非常好 ...

还有一般超时时间是多少
回复

使用道具 举报

sstutu 发表于 2017-10-17 21:13:24
sinv2015 发表于 2017-10-17 20:42
我看了你说那个,我得通过java实现

这个是Java版的基于 spark 1.6 kafka 0.8[mw_shl_code=java,true] import kafka.common.TopicAndPartition;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import kafka.utils.ZKGroupTopicDirs;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import scala.Tuple2;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Set;


    /**
     * Created by shengjk1 on 2016/10/8.
     * Blog Address:http://blog.csdn.net/jsjsjs1789
     */

    public static JavaStreamingContext createContext() {

        final SparkConf conf = new SparkConf().setAppName("scan");
        final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));
        final HiveContext sqlContext = new HiveContext(jssc.sc());
        final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();

        //checkpoint失败后会自动重启,会造成数据丢失
        //sqlContext.setConf("hive.optimize.ppd", "false");
        //jssc.checkpoint("hdfs://centos11:9000/cp");

        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "centos11:9092");
        Set<String> topics = new HashSet<>();
        topics.add("20161008");


        final String zkServer="centos12:2181";
        //ZkClient zkClient = new ZkClient(zkServer, 60 * 1000, 60 * 1000);
        ZkClient zkClient=new ZkClient(zkServer);
        final String topic="20161008";


        //kafka groupid
        ZKGroupTopicDirs zgt=new ZKGroupTopicDirs("test-consumer-group",topic);
        final String zkTopicPath=zgt.consumerOffsetDir();

        int countChildren=zkClient.countChildren(zkTopicPath);


        Map<TopicAndPartition,Long> fromOffsets=new HashMap<>();

        //每一个children都是一个partition
        if(countChildren>0){
            for (int i = 0; i < countChildren; i++) {
                String path=zkTopicPath+"/"+i;

                logger.info("========================zk地址 "+path);
                String offset=zkClient.readData(path);
                TopicAndPartition  topicAndPartition=new TopicAndPartition(topic,i);
                fromOffsets.put(topicAndPartition,Long.parseLong(offset));
            }

            KafkaUtils.createDirectStream(
                    jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    String.class,
                    kafkaParams,
                    fromOffsets,
                    new Function<MessageAndMetadata<String, String>, String>() {
                        public String call(MessageAndMetadata<String, String> v1) throws Exception {
                            return v1.message();
                        }
                    }
            ).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
                @Override
                public JavaRDD<String> call(JavaRDD<String> v1) throws Exception {
                    OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();
                    logger.info("=======================offsetsoffsetsoffsetsoffsets  "+offsets);
                    offsetRanges.set(offsets);
                    return v1;
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> stringJavaRDD1) throws Exception {


                    //业务处理
                    ......


                    //回写zk
                    ZkClient zkClient=new ZkClient(zkServer);
                    OffsetRange[] offsets = offsetRanges.get();
                    if (null != offsets) {
                        logger.info("scan ===================zk开始更新 offsets" + offsets.length);
                        ZKGroupTopicDirs zgt = new ZKGroupTopicDirs("test-consumer-group", topic);
                        String zkTopicPath = zgt.consumerOffsetDir();
                        for (OffsetRange o : offsets) {
                            String zkPath = zkTopicPath + "/" + o.partition();
                            ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
                            logger.info("scan ===================zk更新完成 path: " + zkPath);
                        }
                        zkClient.close();
                    }

                }
            });
        }else{
            KafkaUtils.createDirectStream(jssc,
                    String.class,
                    String.class,
                    StringDecoder.class,
                    StringDecoder.class,
                    kafkaParams,
                    topics)
                    .transformToPair(new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
                @Override
                public JavaPairRDD<String, String> call(JavaPairRDD<String, String> v1) throws Exception {
                    OffsetRange[] offsets = ((HasOffsetRanges) v1.rdd()).offsetRanges();

                    logger.info("=elseelseelseelseelseelse======================offsetsoffsetsoffsetsoffsets  "+offsets);

                    offsetRanges.set(offsets);
                    return v1;
                }

            }).map(new Function<Tuple2<String,String>, String>() {
                @Override
                public String call(Tuple2<String, String> v1) throws Exception {
                    return v1._2();
                }
            }).foreachRDD(new VoidFunction<JavaRDD<String>>() {
                @Override
                public void call(JavaRDD<String> stringJavaRDD) throws Exception {


                    //业务处理
                    ......



                    //回写zk
                    ZkClient zkClient=new ZkClient(zkServer);
                    OffsetRange[] offsets = offsetRanges.get();
                    if (null != offsets) {
                        logger.info("scan ===================zk开始更新 offsets" + offsets.length);
                        for (OffsetRange o : offsets) {
                            String zkPath = zkTopicPath + "/" + o.partition();
                            ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset() + "");
                            logger.info("scan ===================zk更新完成 path: " + zkPath);
                        }
                        zkClient.close();
                    }

                }
            });[/mw_shl_code]

最大的一个问题是代码重复率太高,尝试过代码重构,即返回JavaInputDStream和JavaPairDStream的一个父类,但最后失败了,会报一些类型转化错误,也尝试过使用同一个zkClient,这样的话,ZkClient需要final,而org.I0Itec.zkclient.ZkClient包下的zkCLient是没有办法序列化的,有时间的话可以自己写一个zkClient,zkClient部分的重复代码应该可以解决掉。
也参考过scala的相应代码,它之所以没有重复代码,在于scala可以自动判断返回值得类型。

解决代码重复简单粗暴的方法就是进行代码的封装了。

注意:
1.特别是对于SparkStreaming连接kafka仅仅checkpoints也会导致数据丢失,无法保证at only one。此处着重说明一下若是因为spark代码导致的失败,checkpoints可以保证at only one,但若spark代码执行完毕由于插入数据库时程序失败,即使checkpoint也无法保证at only one
2.此版本更新offset处给予kafka-0.8,而kafka-0.9应该这样:

[mw_shl_code=java,true]new ZkUtils(zkClient, null, false).updatePersistentPath(zkPath, o.untilOffset() + "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
[/mw_shl_code]


来自
http://blog.csdn.net/jsjsjs1789/article/details/52823218



回复

使用道具 举报

sinv2015 发表于 2017-10-17 21:24:37
sstutu 发表于 2017-10-17 21:13
这个是Java版的基于 spark 1.6 kafka 0.8[mw_shl_code=java,true] import kafka.common.TopicAndPartitio ...

歇歇,我刚刚找到了这个代码。试下不知道能不能用
回复

使用道具 举报

sinv2015 发表于 2017-10-19 19:38:44
sstutu 发表于 2017-10-17 21:13
这个是Java版的基于 spark 1.6 kafka 0.8[mw_shl_code=java,true] import kafka.common.TopicAndPartitio ...

我今天拜读了下这个代码,有点疑问求解释,他的原理无非是先从zookeeper中读出offset,然后处理完数据更新,但是为何if(countChildren>0){}else{}要这样,求解惑,我看了有些代码就没有
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条