分享

如何用spark生成Hfile 写入hbase (延伸)

remarkzhao 发表于 2018-2-27 10:52:29 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 18 15652
remarkzhao 发表于 2018-2-28 17:54:24
nextuser 发表于 2018-2-28 12:22
new KeyValue(Bytes.toBytes(rowKey),Bytes.toBytes("f"),Bytes.toBytes(schema(i)),Bytes.toBytes(x.get ...

解析我觉得倒不需要因为是kv对象存在hbase里,等以后读取的时候再解析。
回复

使用道具 举报

remarkzhao 发表于 2018-2-28 17:58:27
nextuser 发表于 2018-2-28 12:22
new KeyValue(Bytes.toBytes(rowKey),Bytes.toBytes("f"),Bytes.toBytes(schema(i)),Bytes.toBytes(x.get ...

应该就是放在一个里面吧   dataSet是关系型数据库读出来的数据集。  dataSet.rdd.map 就是对dataSet的每条记录进行键值对映射,键值对应该就是rowkey(key),value(col1:value1,col2:value2,……)  然后 把返回的RDD进行save  最后再hfile存进hbase
我一会儿添个全部代码。。
回复

使用道具 举报

remarkzhao 发表于 2018-2-28 20:47:38
langke93 发表于 2018-2-27 13:26
一个列和多个列是一样的,你只要拼接字符串就可以了。比如value原先是列名+值,如果多列可以value=名+值, ...

rdd.png


这样就不行了。。。

代码:

val rdd =
  dataSet.map (x =>
      if (x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null) {
        val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
        val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString() + "|" +
          x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString();
          for (i <- 0 to x.length - 1) {
          if (x.getAs(i) != null) {
            val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey),
              Bytes.toBytes("f"), Bytes.toBytes(schema(i)), Bytes.toBytes(x.getAs(i).toString));
            (new ImmutableBytesWritable(kv.getKey), kv)
          }
        }
      }
  )

rdd.
saveAsNewAPIHadoopFile("/tmp/data1", classOf[ImmutableBytesWritable],
  classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration());
rdd不认识了。格式问题吗?还是写的kv就不对?
回复

使用道具 举报

remarkzhao 发表于 2018-2-28 21:47:15
langke93 发表于 2018-2-27 13:26
一个列和多个列是一样的,你只要拼接字符串就可以了。比如value原先是列名+值,如果多列可以value=名+值, ...

rdd1.png
改了一下 但是还是有点不行。。。。

val rdd =  dataSet.rdd.map { x =>
    if (x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null) {
      val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
      val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString() + "|" +
        x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString();
      var str = ""
      for (i <- 0 to x.length - 1) {
        if (x.getAs(i) != null) {
          str = str + schema(i) + x.getAs(i)
        }
      }
      (rowKey, str)
    }
  }.map { x => {
    val kv: KeyValue =
      new KeyValue(Bytes.toBytes(x._1),
        Bytes.toBytes("v"), Bytes.toBytes("value"),
        Bytes.toBytes(x._2 + ""));
    (new ImmutableBytesWritable(kv.getKey), kv)
  }
  }
应该改哪里。。。
回复

使用道具 举报

sstutu 发表于 2018-3-1 16:29:18
remarkzhao 发表于 2018-2-28 21:47
改了一下 但是还是有点不行。。。。

val rdd =  dataSet.rdd.map { x =>

思路有点乱,应该受到了原贴的影响。其实楼主的思路跟下面,是完全不一样的思路。就不需要使用KeyValue
[mw_shl_code=scala,true]val conf = HBaseConfiguration.create();val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)[/mw_shl_code]
   println(rowKey + " f." + schema(i) + ":" + x.getAs(i));,在这里你已经获取了他们的值,为何还需要使用keyvalue。你获取值之后,直接使用put插入就可以了。所以放弃keyvalue,因为你根本不需要。只有在使用saveAsNewAPIHadoopFile的时候,才需要使用。
#####
如果你使用了saveAsNewAPIHadoopFile。那么肯定是使用的下面初始化函数

KeyValue(byte[] row, byte[] family, byte[] qualifier, byte[] value),也就是如下:

KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));

Bytes.toBytes(x._1)是row,Bytes.toBytes("v")是列簇,Bytes.toBytes("value")这里应该是列名, Bytes.toBytes(x._2+"")为value值。

  new KeyValue(Bytes.toBytes(x._1),
        Bytes.toBytes("v"), Bytes.toBytes("value"),
        Bytes.toBytes(x._2 + ""));


对于你上面代码,自己核实下。
Bytes.toBytes(x._1)是否为row, Bytes.toBytes("v")是否为列簇,Bytes.toBytes("value")是否为列名, Bytes.toBytes(x._2 + ""))是否为列值。如果都是那就初始化成功了,你直接使用即可。如果你有多个列,那么这里你Bytes.toBytes("value"),替换为   Bytes.toBytes(schema(i)),感觉没有错误,说不识别是出现了什么问题??具体错误看下。
回复

使用道具 举报

remarkzhao 发表于 2018-3-1 19:59:46
sstutu 发表于 2018-3-1 16:29
思路有点乱,应该受到了原贴的影响。其实楼主的思路跟下面,是完全不一样的思路。就不需要使用KeyValue
...

我的多列有些列的值是空的。所以要进行判断。。
这是我目前写的,我也不知道该怎么写下去了。

val rdd = dataSet.rdd.map { x =>
  if (x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null) {
    val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
    val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString() + "|" +
      x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString();
    var str = ""
    for (i <- 0 to x.length - 1) {
      if (x.getAs(i) != null) {
        str = str + schema(i) + x.getAs(i)
      }
    }
    (rowKey, str)
  }
}.map { x => {
  for ((k, v) <- x)
  val kv: KeyValue =
    new KeyValue((x.getKey),
      Bytes.toBytes("v"), Bytes.toBytes("value"),
      Bytes.toBytes(x._2 + ""));
  (new ImmutableBytesWritable(kv.getKey), kv)
}
}
回复

使用道具 举报

remarkzhao 发表于 2018-3-1 20:23:52
sstutu 发表于 2018-3-1 16:29
思路有点乱,应该受到了原贴的影响。其实楼主的思路跟下面,是完全不一样的思路。就不需要使用KeyValue
...

哎,,错误就是不会写……
回复

使用道具 举报

remarkzhao 发表于 2018-3-1 20:28:45
sstutu 发表于 2018-3-1 16:29
思路有点乱,应该受到了原贴的影响。其实楼主的思路跟下面,是完全不一样的思路。就不需要使用KeyValue
...

我不用put的原因是速度太慢。效率太低了。。。所以先生成Hfile,再bulkload到hbase。
回复

使用道具 举报

July_6329 发表于 2018-7-11 16:05:32
你好,请问你解决了吗?是怎么解决的呢,能贴一下代码不?
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条