分享

spark-streaming 计算hdfs数据

bigdatalixiwei 发表于 2017-4-10 20:27:06
首先自己犯了一个错误,自己的spark是单机版,首先要改成 sparkConf.setMaster("local[2]")或者
sparkConf.setMaster("local")
其次spark-streaming只会计算新放入hdfs目录的数据,比如说我们在启动下面的代码之前监控的hdfs目录中已经有文档了,那么不会进行计算,当我们有新的数据文档放入的时候才会启动计算 和打印计算数据。
再次,下面实现了把计算的数据放入hdfs,但是每次都是新建一个目录,上面的图片里边有显示,现在还没解决这个问题,有知道解决方案的可以告知一下,我想实现的是计算的结果放入一个文件,并且计算结果不断累加,而不是每次计算完成后在hdfs上新建一个文件夹

  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage: HdfsWordCount <directory>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    sparkConf.setMaster("local[2]")

    // Create the context
    //spark://h1:7077
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //ssc.sparkContext.hadoopConfiguration.set("out put path",
    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(","))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
wordCounts.saveAsTextFiles("hdfs://h1:9000/data/result/sparkstreaminghdfswordcount")
     ssc.start()
    ssc.awaitTermination()
  }


QQ截图20170410202458.png
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条