分享

spark streaming saveAsTextFile的问题

Fortitude 发表于 2016-1-5 18:45:45 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 42008
saveAsTextFile保存到HDFS后,会生成part-00000之类的文件,并且如果文件存在的话会覆盖,那么spark streaming这种周期执行的代码,永远只会保留最后一次的写入结果,求教如何把每一次生成的文件都保存下来,也就是如何可以自定义保存时的文件名。

已有(5)人评论

跳转到指定楼层
regan 发表于 2016-1-6 09:20:01
saveAsTextFile(prefix,sufix)调用该方法 的时候可以指定文件夹的前缀及后缀。对于更改文件夹里面文件的名称,这个在API里面我还不知道怎么做,但是我可以给你提供一个思路,因为我之前就是这样做的。你可以单独起一个线程,这个线程放在Receiver里面,使用这个线程去遍历HDFS,然后读取part-00000文件中的内容,得到文件中内容后,将该内容作为文件名称。
回复

使用道具 举报

regan 发表于 2016-1-6 09:21:22
给你一个该思路实现,希望对你有帮助
new Thread("Thread to update HDFS File Name") {
      override def run(): Unit = {
        while (true) {
          try {
            val conf = new org.apache.hadoop.conf.Configuration()
            val hdfs = FileSystem.get(conf)
            val path = new Path(ProjectConf.resultPath)
            val status = hdfs.listStatus(path)
            for (a <- status) {
              val fileName = a.getPath.getName
              val filePath = a.getPath.toString
              if (fileName.startsWith("-")) {
                //println("hdfs file->" + a.getPath.toString)
                val filesStatus = hdfs.listStatus(new Path(filePath))
                val containFiles = new ArrayBuffer[String]()
                for (a <- filesStatus) {
                  containFiles.append(a.getPath.getName)
                }
                if (containFiles.contains("_SUCCESS")) {
                  val temp = containFiles.filter(a => !a.equals("_SUCCESS"))
                  if (temp.size > 0) {
                    val basePath = ProjectConf.resultPath + fileName + "/"
                    val partFilePath = basePath + temp(0)
                    val fileSize = hdfs.getFileStatus(new Path(partFilePath)).getLen
                    //文件大小为0
                    if (fileSize == 0) {
                      hdfs.delete(new Path(basePath), true)
                    }
                    //文件大小大于0
                    else {
                      val br = new BufferedReader(new InputStreamReader(hdfs.open(new Path(partFilePath))))
                      val line = br.readLine()
                      if (StrUtil.isNotEmpty(line)) {
                        val fields = line.split(",")
                        if (fields.length > 0) {
                          val timeH = fields(0).substring(0, 10)
                          //重命名
                          hdfs.rename(new Path(filePath), new Path(ProjectConf.resultPath + "mr_" + timeH + "_" + System.currentTimeMillis() + "/"))
                        }
                      }
                      br.close()
                    }
                  } else {
                    //删除只含_SUCCESS标志文件目录
                    hdfs.delete(new Path(ProjectConf.resultPath + fileName + "/"), true)
                  }
                  val successFilePath = ProjectConf.resultPath + fileName + "/" + "_SUCCESS"
                  if (hdfs.exists(new Path(successFilePath))) {
                    hdfs.delete(new Path(successFilePath), false)
                  }
                }
              }
            }
            hdfs.close()
            Thread.sleep(ProjectConf.renameHDFSFileDuration * 1000)
          } catch {
            case e: Exception => e.printStackTrace()
          }
        }
      }
    }.start()
回复

使用道具 举报

Fortitude 发表于 2016-1-6 10:50:39
regan 发表于 2016-1-6 09:20
saveAsTextFile(prefix,sufix)调用该方法 的时候可以指定文件夹的前缀及后缀。对于更改文件夹里面文件的名 ...

我是用的spark1.2的java api,rdd.saveAsTextFile方法没有前后缀参数。JavaPairDStream有saveAsNewAPIHadoopFiles方法,有前后缀参数,但是不能指定文件hdfs的路径。
回复

使用道具 举报

regan 发表于 2016-1-6 11:33:07
本帖最后由 regan 于 2016-1-6 11:34 编辑
Fortitude 发表于 2016-1-6 10:50
我是用的spark1.2的java api,rdd.saveAsTextFile方法没有前后缀参数。JavaPairDStream有saveAsNewAPIHad ...

KVRDD.saveAsTextFiles("hdfs://192.168.1.12:9000/home/","sufix")
哥老倌,HDFS路径不就是prefix指定的吗?
回复

使用道具 举报

Fortitude 发表于 2016-1-6 14:26:31
regan 发表于 2016-1-6 11:33
KVRDD.saveAsTextFiles("hdfs://192.168.1.12:9000/home/","sufix")
哥老倌,HDFS路径不就是prefix指定 ...

试了下,还真是,这参数名起得太有歧义了,谢谢哥老倌了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条