about云开发-活到老 学到老

 找回密码
 立即注册

QQ登录

只需一步,快速开始

扫一扫,访问微社区

查看: 2238|回复: 0

spark消费kafka数据手动维护offset

[复制链接]

4

主题

2

听众

0

收听

注册会员

Rank: 2

积分
65
发表于 2018-1-12 18:09:29 | 显示全部楼层 |阅读模式
scala代码

package org.apache.spark.test

import kafka.serializer.StringDecoder
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD

object TestDirectKafka2 {

  def main(args: Array[String]) {
    val args2 = Array[String]("192.168.100.130:9091,192.168.100.130:9092", "test")
    if (args2.length < 2) {
      System.err.println("")
      System.exit(1)
    }

    val Array(brokers, topics) = args2
    val topicSet = Set(topics.split(","): _*)
    val groupId = "group1";
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[1]")
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1");
                // 优雅的停止 kill -15 pid
                sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
                // on yarn模式 时 该设置  driver程序只执行一次,不会失败重试
                sparkConf.set("spark.yarn.maxAppAttempts", "1");
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "group.id" -> groupId)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd)
    val kc = new KafkaCluster(kafkaParams);
    //读取offset
    val fromOffsets: Map[TopicAndPartition, Long] = getOffset(kc, topicSet, groupId)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, MessageAndMetadata[String, String]](ssc, kafkaParams, fromOffsets, messageHandler)
    messages.foreachRDD(rdd => {
      //业务处理
      rdd.foreachPartition(iter => { println(iter.foreach(one => { println(one.topic + "," + one.partition + "," + one.offset + "," + one.message()) })) })
      //保存kafka偏移量
      var offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      saveOffset(kc, offsetRanges, groupId)
    });
    ssc.start()
    ssc.awaitTermination()
  }

  def saveOffset(kc: KafkaCluster, offsetRanges: Array[OffsetRange], groupId: String): Unit = {
    var fromOffsets = Map[TopicAndPartition, Long]();
    offsetRanges.foreach { offsetRange => {
        fromOffsets += (offsetRange.topicAndPartition() -> offsetRange.untilOffset);
      }
    }
    saveOffset(kc, fromOffsets, groupId)
  }
  def saveOffset(kc: KafkaCluster, fromOffsets: Map[TopicAndPartition, Long], groupId: String): Unit = {
    kc.setConsumerOffsets(groupId, fromOffsets);
  }
  /**
   * 读取offset
   * 判断offset是否已被kafka清理
   */
  def getOffset(kc: KafkaCluster, topics: Set[String], groupId: String): Map[TopicAndPartition, Long] = {
                val kafkaPartitionsISet = kc.getPartitions(topics).right.get;
                val consumerOffsetsE = kc.getConsumerOffsets(groupId, kafkaPartitionsISet);
                var fromOffsets = Map[TopicAndPartition, Long]();
                var consumerOffsets:Map[TopicAndPartition, Long] = null;
                if (consumerOffsetsE.isRight) {
                        consumerOffsets = consumerOffsetsE.right.get;
                }
                val earliestLeaderOffsets = kc.getEarliestLeaderOffsets(kafkaPartitionsISet).right.get;
                earliestLeaderOffsets.foreach((topicAndPartitionEntry)=>{
                  val k = topicAndPartitionEntry._1
                  val leaderOffsetL = topicAndPartitionEntry._2.offset
                        if (consumerOffsets != null) {
                                val oValue = consumerOffsets.get(k);
                                val offsetObj = oValue.getOrElse(-1L);
                                val offset = Math.max(offsetObj, leaderOffsetL)
                                fromOffsets += (k->offset)
                        }
                })
    fromOffsets
  }
}


java代码

package liu.yuze.spark.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import kafka.common.TopicAndPartition;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.test.JavaScalaConverter;

import scala.Option;
import scala.collection.mutable.ArrayBuffer;
import scala.util.Either;

public class TestDirectKafka2 {

        public static void main(String[] args) {
                args = new String[] { "192.168.100.130:9091,192.168.100.130:9092", "test" };
                if (args.length < 2) {
                        System.err.println("输入参数错误");
                        System.exit(1);
                }
                boolean log4jInitialized = Logger.getRootLogger().getAllAppenders().hasMoreElements();
                if (!log4jInitialized) {
                        Logger.getRootLogger().setLevel(Level.WARN);
                }

                String brokers = args[0];
                String topics = args[1];
                final String groupId = "group1";
                SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local");
                sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "1");
                // 优雅的停止 kill -15 pid
                sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
                // on yarn模式 时 该设置  driver程序只执行一次,不会失败重试
                sparkConf.set("spark.yarn.maxAppAttempts", "1");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
                final HashMap<String, String> kafkaParams = new HashMap<>();
                kafkaParams.put("metadata.broker.list", brokers);
                kafkaParams.put("group.id", groupId);
                // 获取topic partition 和 偏移量
                Set<String> topicSet = new HashSet<String>(Arrays.asList(topics.split(",")));

                scala.collection.immutable.Map<String, String> kafkaParamsIMap = JavaScalaConverter.convertAsIMap(kafkaParams);
                final KafkaCluster kc = new KafkaCluster(kafkaParamsIMap);

                HashMap<TopicAndPartition, Long> fromOffsets = getOffset(kc, topicSet, groupId);
                Function<MessageAndMetadata<String, String>, MessageAndMetadata> function = new Function<MessageAndMetadata<String, String>, MessageAndMetadata>() {
                        private static final long serialVersionUID = 1L;

                        @Override
                        public MessageAndMetadata<String, String> call(MessageAndMetadata<String, String> v1) throws Exception {
                                return v1;
                        }
                };
                JavaInputDStream<MessageAndMetadata> createDirectStream2 = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, MessageAndMetadata.class,
                                kafkaParams, fromOffsets, function);
                createDirectStream2.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata>>() {

                        @Override
                        public void call(JavaRDD<MessageAndMetadata> rdd) throws Exception {
                                rdd.foreachPartition(new VoidFunction<Iterator<MessageAndMetadata>>() {

                                        @Override
                                        public void call(Iterator<MessageAndMetadata> t2) throws Exception {
                                                while (t2.hasNext()) {
                                                        MessageAndMetadata next = t2.next();
                                                        System.out.println(next);
                                                }
                                        }
                                });
                                OffsetRange[] offsetRanges = ((HasOffsetRanges) JavaRDD.toRDD(rdd)).offsetRanges();
                                saveOffset(kc, offsetRanges, groupId);
                        }
                });

                jssc.start();
                jssc.awaitTermination();
        }

        public static void saveOffset(KafkaCluster kc, OffsetRange[] offsetRanges, String groupId) {
                HashMap<TopicAndPartition, Object> fromOffsets = new HashMap<>();
                for (OffsetRange offsetRange : offsetRanges) {
                        fromOffsets.put(offsetRange.topicAndPartition(), offsetRange.untilOffset());
                }
                saveOffset(kc, fromOffsets, groupId);
        }
       
        public static void saveOffset(KafkaCluster kc, Map<TopicAndPartition, Object> fromOffsets, String groupId) {
                scala.collection.immutable.Map<TopicAndPartition, Object> convertAsIMap = JavaScalaConverter.convertAsIMap(fromOffsets);
                kc.setConsumerOffsets(groupId, convertAsIMap);
        }

        public static HashMap<TopicAndPartition, Long> getOffset(KafkaCluster kc, Set<String> topicSet, String groupId) {
                scala.collection.immutable.Set<String> topicISet = JavaScalaConverter.convertAsISet(topicSet);
                scala.collection.immutable.Set<TopicAndPartition> kafkaPartitionsISet = kc.getPartitions(topicISet).right().get();
                Either<ArrayBuffer<Throwable>, scala.collection.immutable.Map<TopicAndPartition, Object>> consumerOffsetsE = kc.getConsumerOffsets(groupId, kafkaPartitionsISet);
                Set<TopicAndPartition> kafkaPartitionsJSet = JavaScalaConverter.convertAsJSet(kafkaPartitionsISet);
                HashMap<TopicAndPartition, Long> fromOffsets = new HashMap<>();
                scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsets = null;
                if (consumerOffsetsE.isRight()) {
                        consumerOffsets = consumerOffsetsE.right().get();
                }
                scala.collection.immutable.Map<TopicAndPartition, LeaderOffset> earliestLeaderOffsets = kc.getEarliestLeaderOffsets(kafkaPartitionsISet).right().get();
                for (TopicAndPartition topicAndPartition : kafkaPartitionsJSet) {
                        long leaderOffsetL = -1;
                        Option<LeaderOffset> leaderOffsetO = earliestLeaderOffsets.get(topicAndPartition);
                        LeaderOffset leaderOffset = leaderOffsetO.getOrElse(null);
                        if (leaderOffset != null) {
                                leaderOffsetL = leaderOffset.offset();

                                long zkOffset = -1;
                                if (consumerOffsets != null) {
                                        Option<Object> oValue = consumerOffsets.get(topicAndPartition);
                                        Object offsetObj = oValue.getOrElse(null);
                                        if (offsetObj != null) {
                                                zkOffset = (Long) offsetObj;
                                        }
                                }
                                Long offset = Math.max(leaderOffsetL, zkOffset);
                                fromOffsets.put(topicAndPartition, offset);
                        } else {
                                fromOffsets.put(topicAndPartition, 0L);
                        }
                }
                return fromOffsets;
        }
}


package org.apache.spark.test

object JavaScalaConverter {

  def main(args: Array[String]): Unit = {
    println("dddd")
  }

  def convertAsIMap[K, V](jmap: java.util.Map[K, V]): collection.immutable.Map[K, V] = {
    var mMap = collection.JavaConversions.mapAsScalaMap(jmap)
    val iMap = collection.immutable.Map(mMap.toSeq: _*)
    iMap
  }

  def convertAsISet[K](jset: java.util.Set[K]): collection.immutable.Set[K] = {
    var mset = collection.JavaConversions.asScalaSet(jset)
    val iset = collection.immutable.Set(mset.toSeq: _*)
    iset
  }

  def convertAsJSet[K](iSet: collection.immutable.Set[K]): java.util.Set[K] = {
    var jset = collection.JavaConversions.setAsJavaSet(iSet)
    jset
  }
}



您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

站长推荐上一条 /4 下一条

QQ|小黑屋|about云开发-学问论坛|社区-大数据云技术学习分享平台 ( 京ICP备12023829号

GMT+8, 2018-7-20 04:41 , Processed in 0.446776 second(s), 31 queries , Gzip On.

Powered by Discuz! X3.2 Licensed

快速回复 返回顶部 返回列表