分享

Structured Streaming Kafka Offset

zstu 2018-6-28 14:55:07 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 8358
hi,想问一下大家有用过Structured Streaming消费kafka数据吗?它能自己管理kafka offset吗?

已有(4)人评论

跳转到指定楼层
yuwenge 发表于 2018-6-28 16:04:14
可以的:针对 Kafka 0.10 提供了 Structured Streaming 集成,以读取数据,并将数据写入 Kafka。


可以从最新或则指定offset开始消费数据
从最新offset开始消费
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )

        val ssc =new StreamingContext(OpContext.sc, Seconds(2))
        val topics = Array("test")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
        stream.foreachRDD(rdd=>{
          val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          rdd.foreachPartition(iter=>{
                val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
                println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
          })
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        })

//    stream.map(record => (record.key, record.value)).print(1)
        ssc.start()
        ssc.awaitTermination()
  }[/mw_shl_code]


从指定的offset开始消费
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
  val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "use_a_separate_group_id_for_each_stream",
        //      "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
  )
  val ssc = new StreamingContext(OpContext.sc, Seconds(2))
  val fromOffsets = Map(new TopicPartition("test", 0) -> 1100449855L)
  val stream = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent,
        Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
  )

  stream.foreachRDD(rdd => {
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        for (o <- offsetRanges) {
          println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })

  //    stream.map(record => (record.key, record.value)).print(1)
  ssc.start()
  ssc.awaitTermination()
}[/mw_shl_code]



推荐参考
Structured Streaming + Kafka 集成指南 (Kafka broker 版本 0.10.0 或更高)

http://spark.apachecn.org/docs/c ... ka-integration.html

Spark2.11 两种流操作 + Kafka

http://blog.51cto.com/13064681/1943431





回复

使用道具 举报

zstu 发表于 2018-6-28 16:11:10
yuwenge 发表于 2018-6-28 16:04
可以的:针对 Kafka 0.10 提供了 Structured Streaming 集成,以读取数据,并将数据写入 Kafka。

谢谢。你这个代码是spark Streaming的吧。我是想Structured Streaming有没有想spark Streaming那样可以自己管理offset
回复

使用道具 举报

yuwenge 发表于 2018-6-28 16:54:02
zstu 发表于 2018-6-28 16:11
谢谢。你这个代码是spark Streaming的吧。我是想Structured Streaming有没有想spark Streaming那样可以自 ...

//订阅多个主题,指定显式kafka偏移



[mw_shl_code=scala,true]val df = spark

.read

.format("kafka")

.option("kafka.bootstrap.servers", "host1:port1,host2:port2")

.option("subscribe", "topic1,topic2")

.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")

.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")

.load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

.as[(String, String)][/mw_shl_code]



//订阅模式,最早和最新偏移量


第二个链接其实有的,对应的代码如下

[mw_shl_code=scala,true]val df = spark

  .read

  .format("kafka")

  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")

  .option("subscribePattern", "topic.*")

  .option("startingOffsets", "earliest")

  .option("endingOffsets", "latest")

  .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

  .as[(String, String)][/mw_shl_code]

回复

使用道具 举报

zstu 发表于 2018-6-28 18:48:12
yuwenge 发表于 2018-6-28 16:54
//订阅多个主题,指定显式kafka偏移

option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")中"topic1":{"0":50,"1":-1}是指每次topic1的partition0增量消费50条数据吗?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条