分享

KafkaUtils.createDirectStream参数报错

import kafka.api.OffsetRequest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import kafka.serializer.StringDecoder
import scala.collection.immutable.HashMap
import org.apache.log4j.Level
import org.apache.log4j.Logger

object Nginx {
  def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
val conf = new SparkConf().setAppName("Nginx").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(3))
val topics = Set("kafkatext")
val groupId = "groupid"
val brokers = "spark:9092"
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"group.id" -> groupId)
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val lines = kafkaStream.map(_._2)
val Nginx = lines.flatMap(line=>(line.split("\\t")).map(line=>(line.split(" ")(2),line.split(" ")(29),line.split(" ")(27))))
Nginx.print()
ssc.start()
ssc.awaitTermination()
}
}


warning: there was one deprecation warning; re-run with -deprecation for details
java.lang.NoSuchMethodError: kafka.api.TopicMetadata.errorCode()S
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1$$anonfun$4.apply(KafkaCluster.scala:135)
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1$$anonfun$4.apply(KafkaCluster.scala:135)
  at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
  at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
  at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:135)
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:133)
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:366)
  at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:362)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
  at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:362)
  at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:133)
  at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
  at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
  at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:485)



已有(4)人评论

跳转到指定楼层
bottlevil 发表于 2018-7-3 18:28:47
各位大神,我在运行以上代码时,KafkaUtils.createDirectStream那里总是会报错,我上网找资料感觉我写的和别人写的没什么区别呀,求各位大神帮忙找下错误
回复

使用道具 举报

jixianqiuxue 发表于 2018-7-3 19:02:38
bottlevil 发表于 2018-7-3 18:28
各位大神,我在运行以上代码时,KafkaUtils.createDirectStream那里总是会报错,我上网找资料感觉我写的和 ...

是不是版本不符造成的。kafka编程有多个版本。以前分高级api,低级api,后来又都不分了。
回复

使用道具 举报

bottlevil 发表于 2018-7-4 09:43:49
jixianqiuxue 发表于 2018-7-3 19:02
是不是版本不符造成的。kafka编程有多个版本。以前分高级api,低级api,后来又都不分了。

好的,我看一下,谢谢啦


回复

使用道具 举报

bottlevil 发表于 2018-7-4 14:15:10
jixianqiuxue 发表于 2018-7-3 19:02
是不是版本不符造成的。kafka编程有多个版本。以前分高级api,低级api,后来又都不分了。

我看了一下,是我的jar包和spark命令出了问题
我的spark是2.30版本的,kafka是2.11-1.1.0 版本,
本来应该用0.10版本的包,我却用来0.8的
而且代码结构也要改,我改了之后是可以运行了,但是spark却不能从kafka中读取到数据
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.clients.consumer.ConsumerRecord

object Nginx {
  def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Nginx").setMaster("local[2]")
val ssc = new StreamingContext(sc, Seconds(3))
val topics = Array("kafkatext")
val groupId = "con-consumer-group"
val brokers = "192.168.19.13:9092"
val kafkaParams = Map(
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
var lines = KafkaUtils.createDirectStream[String,String](ssc, PreferConsistent,Subscribe[String,String](topics,kafkaParams))
val nginx = lines.map(s =>(s.value()))
val Nginx = nginx.flatMap(line=>(line.split("\\t")).map(line=>(line.split(" ")(2),line.split(" ")(29),line.split(" ")(27))))
Nginx.print()
ssc.start()
ssc.awaitTermination()
}
}


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条