分享

spark-streaming 计算hdfs数据

代码如下:
def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    sparkConf.setMaster("spark://h1:7077")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    wordCounts.saveAsTextFiles("word", "count")

        ssc.start()
    ssc.awaitTermination()
  }

运行报错如下:
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.ShuffledRDD

输入目录下只有一个文件,文件内容如下:
hank  you
what is your name
my name is lili
nice  to meet you

已有(10)人评论

跳转到指定楼层
starrycheng 发表于 2017-3-24 18:10:10
def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    sparkConf.setMaster("spark://h1:7077")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    wordCounts.saveAsTextFiles("word", "count")

        ssc.start()
    ssc.awaitTermination()
  }
saveAsTextFiles这个函数有问题,而且多个s



回复

使用道具 举报

NEOGX 发表于 2017-3-24 18:17:51
wordCounts.saveAsTextFiles("word", "count")如果是saveAsTextFile参数也不对,应该最起码有路径如下面
saveAsTextFiles("hdfs://ip:port/home/songsf/data/result1")
回复

使用道具 举报

bigdatalixiwei 发表于 2017-3-24 18:24:19
代码提示里边没有
QQ截图20170324182220.png
回复

使用道具 举报

bigdatalixiwei 发表于 2017-3-24 18:27:39
NEOGX 发表于 2017-3-24 18:17
wordCounts.saveAsTextFiles("word", "count")如果是saveAsTextFile参数也不对,应该最起码有路径如下面
...

我把这一行去掉了,还是报下面的错误
QQ截图20170324182220.png
回复

使用道具 举报

NEOGX 发表于 2017-3-24 18:30:24
bigdatalixiwei 发表于 2017-3-24 18:27
我把这一行去掉了,还是报下面的错误

用的哪个版本,是不是有重写的源码
回复

使用道具 举报

bigdatalixiwei 发表于 2017-3-24 18:33:16
NEOGX 发表于 2017-3-24 18:30
用的哪个版本,是不是有重写的源码

我的scala版本是2.11.8    spark版本是spark-2.0.2-bin-hadoop2.7      这个是\spark-2.0.2-bin-hadoop2.7\examples\src\main\scala\org\apache\spark\examples\streaming\HdfsWordCount.scala   里边的例子,自己运行就报错
回复

使用道具 举报

bigdatalixiwei 发表于 2017-3-24 18:34:10
starrycheng 发表于 2017-3-24 18:10
def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWo ...

代码提示里边 没有,我把这一行去掉后还是报错
QQ截图20170324182220.png
回复

使用道具 举报

NEOGX 发表于 2017-3-24 19:04:09
可能包引入不够
回复

使用道具 举报

NEOGX 发表于 2017-3-25 14:02:03
程序代码思路,没看出什么问题来。
import的什么都是
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条