分享

解惑:Spark Streaming 数据导入hbase的困惑

package cn.chinahadoop.streaming

import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Chen Chao
*/
object HdfsWordCount {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: HdfsWordCount <master> <directory> <seconds>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster(args(0))

    val sc = new SparkContext(sparkConf)

    //新建StreamingContext
val ssc=new StreamingContext(sc,Seconds(args(2).toInt))
    ssc.checkpoint(".")

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }

    //创建FileInputDStream,并指向特定目录
val lines = ssc.textFileStream(args(1))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        //.updateStateByKey[Int](updateFunc)
    //wordCounts.print()
    //数据保存到hbase
wordCounts.foreachRDD(rdd => {
      println("=============save hbase==================")
      println("rdd : " + rdd.first())
      println("rdd count : " + rdd.count())
      if(!rdd.isEmpty()){
        rdd.foreachPartition(partitionRecodes => {
          val configuration = HBaseConfiguration.create()
          configuration.set("hbase.zookeeper.property.clientPort", "2181")
          configuration.set("hbase.zookeeper.quorum", "192.168.xxx.xxx")
          configuration.set("hbase.master", "192.168.xxx.xxx:60000")
          val connection = ConnectionFactory.createConnection(configuration)
          val tableName = TableName.valueOf("tableName")
          val table = connection.getTable(tableName)
          partitionRecodes.foreach(data => {
            println("data : " + data)
             try {
               val put = new Put(Bytes.toBytes(String.valueOf(System.currentTimeMillis()).reverse+data._1))
               put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("word"), Bytes.toBytes(data._1))
               put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(data._2))
               table.put(put)
             } catch {
               case e :Exception => e.printStackTrace()
             } finally {
               table.close()
             }
          })
        })
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

各位大神,我执行上述代码时,无法将数据插入到hbase中的tableName表中,但是数据是有的。
是代码有问题,还是其他的问题,还请大神多多指教。 1.png
2.png

日志

日志




已有(3)人评论

跳转到指定楼层
tntzbzc 发表于 2017-11-15 16:28:12
先定位错误,48到50行,是哪几行?
回复

使用道具 举报

xiaobaiyang 发表于 2017-11-15 20:24:00
tntzbzc 发表于 2017-11-15 16:28
先定位错误,48到50行,是哪几行?

是这几行:
println("rdd : " + rdd.first())
println("rdd count : " + rdd.count())
if(!rdd.isEmpty()){
回复

使用道具 举报

NEOGX 发表于 2017-11-15 21:45:28
xiaobaiyang 发表于 2017-11-15 20:24
是这几行:
println("rdd : " + rdd.first())
println("rdd count : " + rdd.count())

这段代码:  
println("rdd : " + rdd.first())
      println("rdd count : " + rdd.count())
放到这里面
[mw_shl_code=scala,true]  if(!rdd.isEmpty())
{
println("rdd : " + rdd.first())
      println("rdd count : " + rdd.count())
}[/mw_shl_code]

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条