分享

使用Spark Streaming + Kafka 实现有容错性的实时统计程序

Oner 发表于 2016-11-29 21:44:54 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 12606
本帖最后由 Oner 于 2016-12-9 20:54 编辑
问题导读
1.  createStream和createDirectStream的优缺点是什么?
2.  为什么要使用checkpoint?如何使用?
3.  使用createDirectStream如何将kafka的偏移量存入zookeeper?

4.  如何从kafka中读取消费者的偏移量?
5.  如何从zookeeper读取保存的kafka的偏移量?
6.  实现整个程序的过程中可能碰到哪些问题?
7.  如何实现一个有容错性的实时统计程序?


因为时间原因,先将文章的整体框架写出来,之后会持续补充完整。

【——持续更新——】

使用spark streaming读取kafka

spark streaming提供了两种读取kafka数据的方式,createStream和createDirectStream。来看下官方的介绍.

Approach 1: Receiver-based Approach

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See Deploying section in the streaming programming guide for more details on Write Ahead Logs.
[mw_shl_code=scala,true]import org.apache.spark.streaming.kafka._

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])[/mw_shl_code]
Points to remember:
  • Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the KafkaUtils.createStream() only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.
  • Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
  • If you have enabled Write Ahead Logs with a replicated file system like HDFS, the received data is already being replicated in the log. Hence, the storage level in storage level for the input stream to StorageLevel.MEMORY_AND_DISK_SER (that is, use KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

简单地总结下上面的意思:

Receiver-based是基于kafka高级consumer API实现的。使用Receiver将从kafka接收到的数据存入spark executor中,然后由spark streaming启动job来处理数据。但是这种方式在默认情况下可能会导致数据丢失,为了防止数据丢失,必须启动spark streaming的WAL机制(Write Ahead Logs),它会同步地将所有从kafka接收到的数据保存在分布式文件系统中(如HDFS),这样出错时也可以恢复。

需要注意几点:
1. kafka的topic的partition跟spark streaming中生成RDD的partition不相关。增加KafkaUtils.createStream()中特定主题分区的数量只会增加在单个接收器中使用的主题的线程数,并不会增加spark streaming中处理数据的并行度。

2. 可以使用不同的kafka consumer group和topic来创建多个kafka输入DStream,这样就可以使用多个Receiver来并行的接收kafka数据。

3. 如果启用了WAL机制,需要将storage level设置为MEMORY_AND_DISK_SER,可以通过KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)来设置。

Approach 2: Direct Approach (No Receivers)

This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach (i.e. Approach 1).
  • Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

[mw_shl_code=scala,true] import org.apache.spark.streaming.kafka._

val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])[/mw_shl_code]
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
[mw_shl_code=scala,true]// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()[/mw_shl_code]

简单总结下上面的意思:

使用direct方式会定期的查询kafka topic+partition的最新的offset,相应地会在spark streaming中定义每批要处理数据的offset range。当处理数据的job启动后,会使用低级的consumer API来来读取offset range的数据(类似于从一个文件中读取文件)。

这种方式相比于receiver- based方式有以下几个优势:
1. 简化并行度:
不需要创建多个kafka输入DStream然后union。Spark Streaming将创建与要消费的Kafka分区一样多的RDD分区,这将从Kafka并行读取数据。

2. 高效:
使用receiver-based方式来防止数据丢失需要将数据写入到HDFS中,这会多一步复制数据。这样效率会低,因为数据被复制了两次,一次是kafka,另一次是WAL。direct方式消除了这个问题,因为没有receiver,所以不需要WAL,数据恢复只需要由kafka完成即可。

3. Exactly-once语义:
使用receiver-based方式可以保证数据不丢失(满足at-least once语义),但是在一些小概率情况下,一些记录可能在一些故障下处理两次,这是因为Spark Streaming可靠接收的数据与Zookeeper跟踪的偏移之间存在不一致。因此在direct方式中,使用kafka低级consumer,将offset由spark streaming维护,存入到checkpoint中,这就避免了offset在spark streaming和kafka/zookeeper之间的不一致,所以即便在出现故障的情况下,数据也只会被spark streaming有效地接受一次。为了实现Exactly-once语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者是保存结果和偏移量的原子事务。

但是使用这种方式不会更新zookeeper中的offset,因此基于zookeeper的kafka监测工具会失效,但是可以自己得到每批数据在kafka中的offset,并自己更新zookeeper中的offset。

spark streaming checkpoint机制

spark streaming程序应该是7*24小时地跑,所以必须要有一个针对于程序逻辑无关错误(系统错误,JVM崩溃等)的容错机制。checkpoint会将足够多的信息存放在一个容错文件系统上(如HDFS),以便出促使能快速恢复。有两种类型的数据可以checkpoint。

1. Metadata checkpointing
将流式计算的信息保存到具备容错性的存储上如HDFS,Metadata Checkpointing适用于当streaming应用程序Driver所在的节点出错时能够恢复,元数据包括:
Configuration(配置信息) - 创建streaming应用程序的配置信息
DStream operations - 在streaming应用程序中定义的DStreaming操作
Incomplete batches - 在列队中没有处理完的作业

2. Data checkpointing
将生成的RDD保存到外部可靠的存储当中,对于一些数据跨度为多个bactch的有状态tranformation操作来说,checkpoint非常有必要,因为在这些transformation操作生成的RDD对前一RDD有依赖,随着时间的增加,依赖链可能会非常长,checkpoint机制能够切断依赖链,将中间的RDD周期性地checkpoint到可靠存储当中,从而在出错时可以直接从checkpoint点恢复。
具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

在程序中想要使用checkpoint机制,可以通过streamingContext.checkpoint(checkpointDirectory)来实现。如果想要spark streaming应用能够从driver故障中恢复,需要满足以下两个条件:
1.  当第一次启动spark streaming程序时,需要创建一个StreamingContext,然后设置所有的DStream,然后启动。
2.  当spark streaming失败后重启时,需要从checkpoint目录中的checkpoint数据中恢复一个StreamingContext。

spark提供了一个能满足上面条件的函数,StreamingContext.getOrCreate.
[mw_shl_code=scala,true]// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // new context
    val lines = ssc.socketTextStream(...) // create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
    ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()[/mw_shl_code]

如果checkpointDirectory存在,那么将从checkpoint数据重新创建StreamingContext。如果目录不存在(即,第一次运行),则将调用函数functionToCreateContext以创建新的StreamingContext并设置DStreams。
除了使用getOrCreate,还需要确保驱动程序进程在失败时自动重新启动。这只能由用于运行应用程序的部署基础结构完成。这将在部署部分进一步讨论。

需要注意的一点是,使用checkpoint机制可能会导致进行checkpoint操作的那批RDD的处理时间增加,因此,需要设置一个合理的checkpoint时间间隔。官方有个推荐,DStream的5 - 10个滑动间隔的检查点间隔是一个很好的尝试设置。


读取kafka中保存的offset

如果streaming长时间没有启动,那么保存在zookeeper中的offset很可能过期了,如果offset过期了,程序运行时是会出错的,所以这时候需要从kafka中读取最早的offset来进行比较。那么如何读取kafka中某个时间点的offset呢?
[mw_shl_code=scala,true]  private def getOffsetFromKafka(zkServers: String, tp: TopicAndPartition, time: Long):Long = {
    val zkClient = new ZkClient(zkServers,Integer.MAX_VALUE,10000, ZKStringSerializer)
    val request = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(time, 1)))

    //  得到每个分区的leader(某个broker)
    ZkUtils.getLeaderForPartition(zkClient, tp.topic, tp.partition) match {
      case Some(brokerId) =>
        // 根据brokerId得到leader这个broker的详细信息
        ZkUtils.getBrokerInfo(zkClient, brokerId) match {
          case Some(broker) =>
            // 使用leader的host和port来创建一个SimpleConsumer
            val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, "getOffsetShell")
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(tp).offsets
            offsets.head
          case None =>
            throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
        }
      case None =>
        throw new Exception("No broker for partition %s - %s".format(tp.topic, tp.partition))
    }

  }[/mw_shl_code]


读取Zookeeper中保存的offset

Zookeeper中保存kafka的offset的目录在 /consumers/[groupId]/offsets/[topic]/partition下,那如何读取到呢?
[mw_shl_code=scala,true]  def getOffsets(zkServers: String, groupId: String, topics: String): Map[TopicAndPartition, Long] ={
    //  val zkClient = new ZkClient(zkServers)
    // 必须要使用带有ZkSerializer参数的构造函数来构造,否则在之后使用ZkUtils的一些方法时会出错,而且在向zookeeper中写入数据时会导致乱码
    // org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B227665
    val zkClient = new ZkClient(zkServers,Integer.MAX_VALUE,10000, ZKStringSerializer)

    val topicPartitions = ZkUtils.getPartitionsForTopics(zkClient, topics.split(",")).head

    val topic = topicPartitions._1
    val partitions = topicPartitions._2

    val topicDirs = new ZKGroupTopicDirs(groupId, topic)

    var offsetsMap: Map[TopicAndPartition, Long] = Map()

    partitions.foreach { partition =>
      val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" // /consumers/[groupId]/offsets/[topic]/partition
//      ZkUtils.makeSurePersistentPathExists(zkClient, zkPath) // 如果zookeeper之前不存在该目录,就直接创建

      val tp = TopicAndPartition(topic, partition)
      // 得到kafka中该partition的最早时间的offset
      val offsetForKafka = getOffsetFromKafka(zkServers, tp, OffsetRequest.EarliestTime)

      // 得到zookeeper中存储的该partition的offset
      val offsetForZk = ZkUtils.readDataMaybeNull(zkClient, zkPath) match {
        case (Some(offset), stat) =>
          Some(offset)
        case (None, stat) =>  // zookzeeper中未存储偏移量
          None
      }


      if (offsetForZk.isEmpty || offsetForZk.get.toLong < offsetForKafka){// 如果zookzeeper中未存储偏移量或zookzeeper中存储的偏移量已经过期
        println("Zookeeper don't save offset or offset has expire!")
        offsetsMap += (tp -> offsetForKafka)
      }else{
        offsetsMap += (tp -> offsetForZk.get.toLong)
      }

    }
    println(s"offsets: $offsetsMap")
    offsetsMap
  }[/mw_shl_code]

问题汇总

在实现这整个过程中,躺了很多坑,现在总结下。
1.  ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@31a136a6 has not been initialized

在实现createContext方法时发现该方法包含的内容特别多(包含了ssc.checkpoint()和DStream的一些操作),所以就想精简下,按照官方该方法字面意思的理解,就是创建一个StreamingContext,然后返回就可以了,所以就尝试把ssc.checkpoint()和DStream的操作放到main方法中,也就说createContext中只包含了StreamingContext的创建,然后直接返回接口ssc。但是这要操作会有问题,第一次运行程序是不会出错的,但是我强制中断进程后,再次启动它,会出现一个无法启动StreamingContext异常:ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@31a136a6 has not been initialized

2. ERROR StreamingContext: Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable

由于需要将DStream(本质上是KafkaRDD)中所包含的offset写到Zookeeper中,所以需要用到DStream的foreachRDD输出操作,这个操作官方有介绍,
意思是说建立连接时必须放在worker上执行,不能放在driver中执行。最开始时,ZkClient和ZKGroupTopicDirs建立是在createContext方法中,并在DStream.foreachRDD之前建立,但是执行时会出现异常:ERROR StreamingContext: Error starting the context, marking it as stopped

java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
之后将ZkClient和ZKGroupTopicDirs的建立放在了foreachRDD操作中,
[mw_shl_code=scala,true]    messages.foreachRDD{ rdd =>
      val zkClient = new ZkClient(zkServers)
      val topicDirs = new ZKGroupTopicDirs(groupId, topics)
      
      for (offset <- offsetRanges) {
        val zkPath = s"${topicDirs.consumerOffsetDir}/${offset.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.untilOffset.toString)
      }
    }[/mw_shl_code]
但是仍然有个疑问,由于在foreachRDD操作算子中,没有针对RDD做具体操作,按照官方示例代码,ZkClient和ZKGroupTopicDirs的建立应该还是在driver端进行的,但是程序执行时不会报错,这个问题暂时没有考虑清楚是什么原因。

3. offset存入zookeeper乱码问题
在将offset存入zookeeper中,需要建立ZkClient,如果使用ZkClient(String serverstring)构造方法来创建ZkClient,最后在zookeeper客户端使用get命令来得到偏移量时,会发现在偏移量前会有乱码文字,如图。
20161209141317 .jpg
为解决这个问题,可以在建立ZkClient时,使用ZkClient(String zkServers, int sessionTimeout, int connectionTimeout, ZkSerializer zkSerializer)构造方法来创建。


实时统计过去指定时间内某个ip的访问次数

上面已经讲解了如何让spark streaming有一个容错性,现在要利用上面讲的知识来实现一个能够实时统计指定时间内某个ip的访问次数的小程序。

附完整代码:
[mw_shl_code=scala,true]package com.zdm.streaming

/**
  * 1. 程序创建StreamingContext时,如果存在checkpoint目录,则直接根据checkpoint目录中的数据恢复一个StreamingContext,
  * 如果不存在,则会从zookeeper中读取当前consumer groupId消费的offset,然后再从kafka中读取到该topic下最早的的offset,
  * 如果从zookeeper中读取到的offset为空或者小于从kafka中读取到该topic下最早的offset,则采用从kafka中读取的offset,
  * 否则就用zookeeper中存储的offset。
  * 2. 每次生成KafkaRDD之后,都会将它对应的OffsetRange中的untilOffset存入到zookeeper
  */

import java.io.File
import java.util.{ArrayList => JavaArrayList}

import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.json4s._
import org.json4s.native.JsonMethods._

//import scala.collection.JavaConversions._
//import collection.convert.wrapAsScala._
/**
  * Created by wangwei01 on 2016/11/18.
  *
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: DirectKafkaWordCount <brokers> <topics>
  *   <brokers> is a list of one or more Kafka brokers
  *   <topics> is a list of one or more kafka topics to consume from
  *   <checkpointDirectory> is a string of directory for checkpoint
  *
  * Example:
  *    $ spark-submit --master spark://hm01:8070 \
  *         --packages org.apache.spark:spark-streaming_2.10:1.6.1,org.apache.spark:spark-streaming-kafka_2.10:1.6.1,org.json4s:json4s-native_2.10:3.4.0,com.alibaba:druid:1.0.26  \
  *         resys.jar \
  *         ukafka-4rieyu-2-bj03.service.ucloud.cn:2181 ukafka-4rieyu-2-bj03.service.ucloud.cn:9092 proxy-access-log spark /wv/ch 5 20 20
  */
object RecoverableDirectKafkaWordCount {

  case class Agent(http_user_agent: String)
  case class RequestTime(time_local: String)
  case class IP(client_real_ip: String)

  implicit val formats = DefaultFormats

  def createContext(zkServers: String, brokers: String, topics: String,
                    groupId: String, checkpointDirectory: String, batchDuration: Int,
                    windowDuration: Int, slideDuration: Int): StreamingContext = {

    // If you do not see this printed, that means the StreamingContext has been loaded
    // from the new checkpoint
    println("-----------------Creating new context!-----------------")
    val checkpointFile = new File(checkpointDirectory)
    if (checkpointFile.exists()) checkpointFile.delete()

    val sparkConf = new SparkConf()
      .setAppName("RecoverableDirectKafkaWordCount")
      .set("spark.streaming.kafka.maxRatePerPartition", "1")

    // Create the context with a specify second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(batchDuration))
    ssc.checkpoint(checkpointDirectory)

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers,
      "group.id" -> groupId
//            "auto.offset.reset" -> OffsetRequest.SmallestTimeString
    )

    val offsets = getOffsets(zkServers, groupId, topics)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())  //定义kafka接收的数据格式
//    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message())

    // Create direct kafka stream with brokers and topics
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
        ssc, kafkaParams, offsets, messageHandler)

    // Hold a reference to the current offset ranges, so it can be used downstream
    var offsetRanges = Array[OffsetRange]()

    val sta = messages.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.map(_._2)
      .filter(line => line.contains("client_real_ip"))
      .map(line => parse(line).extract[IP].client_real_ip)
      .map(ip => (ip, 1))
      .reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(windowDuration), Seconds(slideDuration))
      .transform(rdd => rdd.sortBy(_._2, ascending = false)) // 按照访问次数降序排列


    // 将kafkaRDD的untilOffset更新到zookeeper中
    messages.foreachRDD{ rdd =>
      // val zkClient = new ZkClient(zkServers)
      val zkClient = new ZkClient(zkServers,Integer.MAX_VALUE,10000, ZKStringSerializer)
      val topicDirs = new ZKGroupTopicDirs(groupId, topics)
//      println(s"rdd: $rdd")
      for (offset <- offsetRanges) {
        val zkPath = s"${topicDirs.consumerOffsetDir}/${offset.partition}"
        ZkUtils.updatePersistentPath(zkClient, zkPath, offset.untilOffset.toString)
//        println(s"[${offset.topic},${offset.partition}]: ${offset.fromOffset},${offset.untilOffset}")
      }
    }

    sta.print()

    ssc
  }

  def main(args: Array[String]): Unit = {
        if (args.length < 8) {
          System.err.println("Your arguments were " + args.mkString("[", ", ", "]"))
          System.err.println(s"""
            |Usage: RecoverableDirectKafkaWordCount <brokers> <topics> <groupId> <checkpointDirectory> <batchDuration> <windowDuration> <slideDuration>
            |  <brokers> is a list of one or more Kafka brokers
            |  <topics> is a list of one or more kafka topics to consume from
            |  <groupId> is a kafka consumer group
            |  <checkpointDirectory> is a string of directory for checkpoint
            |  <batchDuration> is the time interval at which streaming data will be divided into batches
            |  <windowDuration> is width of the window; must be a multiple of this DStream's batching interval
            |  <slideDuration> sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval
            """.stripMargin)
          System.exit(1)
        }

//    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    Logger.getRootLogger.setLevel(Level.WARN)

    val Array(zkServers, brokers, topics, groupId, checkpointDirectory, batchDurationStr, windowDurationStr, slideDurationStr) = args
    val batchDuration = batchDurationStr.toInt
    val windowDuration = windowDurationStr.toInt
    val slideDuration = slideDurationStr.toInt


    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => createContext(zkServers, brokers, topics, groupId, checkpointDirectory, batchDuration, windowDuration, slideDuration))


    ssc.start()
    ssc.awaitTermination()

  }


  def getOffsets(zkServers: String, groupId: String, topics: String): Map[TopicAndPartition, Long] ={
    //  val zkClient = new ZkClient(zkServers)
    // 必须要使用带有ZkSerializer参数的构造函数来构造,否则在之后使用ZkUtils的一些方法时会出错,而且在向zookeeper中写入数据时会导致乱码
    // org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B227665
    val zkClient = new ZkClient(zkServers,Integer.MAX_VALUE,10000, ZKStringSerializer)

    val topicPartitions = ZkUtils.getPartitionsForTopics(zkClient, topics.split(",")).head

    val topic = topicPartitions._1
    val partitions = topicPartitions._2

    val topicDirs = new ZKGroupTopicDirs(groupId, topic)

    var offsetsMap: Map[TopicAndPartition, Long] = Map()

    partitions.foreach { partition =>
      val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" // /consumers/[groupId]/offsets/[topic]/partition
//      ZkUtils.makeSurePersistentPathExists(zkClient, zkPath) // 如果zookeeper之前不存在该目录,就直接创建

      val tp = TopicAndPartition(topic, partition)
      // 得到kafka中该partition的最早时间的offset
      val offsetForKafka = getOffsetFromKafka(zkServers, tp, OffsetRequest.EarliestTime)

      // 得到zookeeper中存储的该partition的offset
      val offsetForZk = ZkUtils.readDataMaybeNull(zkClient, zkPath) match {
        case (Some(offset), stat) =>
          Some(offset)
        case (None, stat) =>  // zookzeeper中未存储偏移量
          None
      }

      if (offsetForZk.isEmpty || offsetForZk.get.toLong < offsetForKafka){// 如果zookzeeper中未存储偏移量或zookzeeper中存储的偏移量已经过期
        println("Zookeeper don't save offset or offset has expire!")
        offsetsMap += (tp -> offsetForKafka)
      }else{
        offsetsMap += (tp -> offsetForZk.get.toLong)
      }

    }
    println(s"offsets: $offsetsMap")
    offsetsMap
  }

  private def getOffsetFromKafka(zkServers: String, tp: TopicAndPartition, time: Long):Long = {
    val zkClient = new ZkClient(zkServers,Integer.MAX_VALUE,10000, ZKStringSerializer)
    val request = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(time, 1)))

    //  得到每个分区的leader(某个broker)
    ZkUtils.getLeaderForPartition(zkClient, tp.topic, tp.partition) match {
      case Some(brokerId) =>
        // 根据brokerId得到leader这个broker的详细信息
        ZkUtils.getBrokerInfo(zkClient, brokerId) match {
          case Some(broker) =>
            // 使用leader的host和port来创建一个SimpleConsumer
            val consumer = new SimpleConsumer(broker.host, broker.port, 10000, 100000, "getOffsetShell")
            val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(tp).offsets
            offsets.head
          case None =>
            throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
        }
      case None =>
        throw new Exception("No broker for partition %s - %s".format(tp.topic, tp.partition))
    }

  }

}


[/mw_shl_code]
程序执行截图: 20161209205336.jpg

已有(1)人评论

跳转到指定楼层
PeersLee 发表于 2017-2-11 13:19:40
楼主,这个项目的依赖是什么,就是,如果是maven构建,pom.xml 是什么?希望解答一下。
回复

使用道具 举报

关闭

推荐上一条 /2 下一条