分享

如何用spark生成Hfile 写入hbase (延伸)

remarkzhao 发表于 2018-2-27 10:52:29 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 18 15595
本帖最后由 remarkzhao 于 2018-2-27 10:55 编辑

val conf = HBaseConfiguration.create();val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)
val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}
rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration())
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)


这段代码是网上最常见的,通常只有一个rowkey,一个列族,一个列,一个列值,然后构建一个键值对生成RDD,那么问题来了,如果我的数据要求是一个rowkey,一个列族,多个列族,多个列,多个列值如何构建?

已有(18)人评论

跳转到指定楼层
langke93 发表于 2018-2-27 13:26:48
一个列和多个列是一样的,你只要拼接字符串就可以了。比如value原先是列名+值,如果多列可以value=名+值,名+值。
本身hbase列是可以增加和减少的
回复

使用道具 举报

remarkzhao 发表于 2018-2-27 14:04:15
langke93 发表于 2018-2-27 13:26
一个列和多个列是一样的,你只要拼接字符串就可以了。比如value原先是列名+值,如果多列可以value=名+值, ...

知道是知道 但是想象不出来 能否写一小段代码看看。我的源数据是从关系型数据库读出来的dataSet,要循环插入到hbase ,有些列是空值。
回复

使用道具 举报

hello2019 发表于 2018-2-27 16:35:12
remarkzhao 发表于 2018-2-27 14:04
知道是知道 但是想象不出来 能否写一小段代码看看。我的源数据是从关系型数据库读出来的dataSet,要循环 ...

val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+"")
关键代码在上面

Bytes.toBytes(x._1)应该是rowkey,v为列簇,value为列名,Bytes.toBytes(x._2为列值

上面的关键其实你的分隔符来确定的。如果你有多个值,应该还可以继续往下面写
val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""),Bytes.toBytes("第二列"),Bytes.toBytes(x._3+"")
然后后面的代码

回复

使用道具 举报

hello2019 发表于 2018-2-27 16:37:48
remarkzhao 发表于 2018-2-27 14:04
知道是知道 但是想象不出来 能否写一小段代码看看。我的源数据是从关系型数据库读出来的dataSet,要循环 ...

如果你是循环插入,那就更简单了。
直接使用hbase api即可,指定rowkey,列簇,列名,列值。下面仅供参考
[mw_shl_code=bash,true]import org.apache.spark._  
import org.apache.spark.rdd.NewHadoopRDD  
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}  
import org.apache.hadoop.hbase.client.HBaseAdmin  
import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.hbase.HColumnDescriptor  
import org.apache.hadoop.hbase.util.Bytes  
import org.apache.hadoop.hbase.client.Put;  
import org.apache.hadoop.hbase.client.HTable;  
   
val conf = HBaseConfiguration.create()  
val tableName = "/t1"  
conf.set(TableInputFormat.INPUT_TABLE, tableName)  
   
val myTable = new HTable(conf, tableName);  
var p = new Put();  
p = new Put(new String("row999").getBytes());  
p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());  
myTable.put(p);  
myTable.flushCommits();  [/mw_shl_code]
回复

使用道具 举报

remarkzhao 发表于 2018-2-27 18:06:06
hello2019 发表于 2018-2-27 16:35
val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes ...

额。其实我是卡在这了。。put我会写的,但是效率太慢

val rdd = dataSet.rdd  //dataSet是关系型数据库读出来的
  .map{x => if(x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null){
    val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
    val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString() + "|" +
      x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString() //拼rowKey
                  for (i <- 0 to x.length - 1) {
                    if (x.getAs(i) != null) {
                      val kv:KeyValue = new KeyValue(Bytes.toBytes(rowKey),Bytes.toBytes("f"),Bytes.toBytes(schema(i)),Bytes.toBytes(x.getAs(i).toString));
                      println(rowKey + " f." + schema(i) + ":" + x.getAs(i)); //这一段始终不会写,不知                        把keyValue放哪。
                     
                    }

                  }
  }
  }
回复

使用道具 举报

remarkzhao 发表于 2018-2-27 18:16:53
列数大概100来列。。
回复

使用道具 举报

nextuser 发表于 2018-2-28 06:39:38
remarkzhao 发表于 2018-2-27 18:16
列数大概100来列。。

你这是要打印吗?
把rowkey放到for循环的外面,放里面每打印一个列,就会打印一个rowkey

回复

使用道具 举报

remarkzhao 发表于 2018-2-28 10:55:34
nextuser 发表于 2018-2-28 06:39
你这是要打印吗?
把rowkey放到for循环的外面,放里面每打印一个列,就会打印一个rowkey

多谢。 但是打印只是我想在调试的时候看到控制台看到它们。我不会写的是如何将rowkey多列及列值写到一个kv对象里
回复

使用道具 举报

nextuser 发表于 2018-2-28 12:22:46
remarkzhao 发表于 2018-2-28 10:55
多谢。 但是打印只是我想在调试的时候看到控制台看到它们。我不会写的是如何将rowkey多列及列值写到一个k ...

new KeyValue(Bytes.toBytes(rowKey),Bytes.toBytes("f"),Bytes.toBytes(schema(i)),Bytes.toBytes(x.getAs(i).toString));
上面写的一个吧,其实你使用的for循环,就已经达到了目的。
都放到一个里面没有太大的价值,如果真想放,那你可以把Bytes.toBytes(schema(i))这个替换为一个字符串变量columns
columns=col1+","+col2+","+col3。。。。。。
然后值也一样
Bytes.toBytes(x.getAs(i).toString)把替换为值,然后
columnsValue=colValue1+","+colValue2+","+colValue3。。。。。。

这样在放到里面。
当然最后你需要解析上面字符串。

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条