分享

MapReduce生成HFile入库到HBase

问题导读:
1.MapReduce生成HFile入库到HBase有什么优点?
2.MapReduce生成HFile入库到HBase存在什么缺点?
3.如何生成HFile?






一、这种方式有很多的优点:

1. 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即HBase提供的HFileOutputFormat类。

2. 它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。

二、这种方式也有很大的限制:

1. 仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。

2. HBase集群与Hadoop集群为同一集群,即HBase所基于的HDFS为生成HFile的MR的集群(额,咋表述~~~)

三、接下来一个demo,简单介绍整个过程。

1. 生成HFile部分
  1. package zl.hbase.mr;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.hbase.KeyValue;
  6. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  7. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
  8. import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
  9. import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  17. import org.apache.hadoop.util.GenericOptionsParser;
  18. import zl.hbase.util.ConnectionUtil;
  19. public class HFileGenerator {
  20.     public static class HFileMapper extends
  21.             Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
  22.         @Override
  23.         protected void map(LongWritable key, Text value, Context context)
  24.                 throws IOException, InterruptedException {
  25.             String line = value.toString();
  26.             String[] items = line.split(",", -1);
  27.             ImmutableBytesWritable rowkey = new ImmutableBytesWritable(
  28.                     items[0].getBytes());
  29.             KeyValue kv = new KeyValue(Bytes.toBytes(items[0]),
  30.                     Bytes.toBytes(items[1]), Bytes.toBytes(items[2]),
  31.                     System.currentTimeMillis(), Bytes.toBytes(items[3]));
  32.             if (null != kv) {
  33.                 context.write(rowkey, kv);
  34.             }
  35.         }
  36.     }
  37.     public static void main(String[] args) throws IOException,
  38.             InterruptedException, ClassNotFoundException {
  39.         Configuration conf = new Configuration();
  40.         String[] dfsArgs = new GenericOptionsParser(conf, args)
  41.                 .getRemainingArgs();
  42.         Job job = new Job(conf, "HFile bulk load test");
  43.         job.setJarByClass(HFileGenerator.class);
  44.         job.setMapperClass(HFileMapper.class);
  45.         job.setReducerClass(KeyValueSortReducer.class);
  46.         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
  47.         job.setMapOutputValueClass(Text.class);
  48.         job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
  49.         FileInputFormat.addInputPath(job, new Path(dfsArgs[0]));
  50.         FileOutputFormat.setOutputPath(job, new Path(dfsArgs[1]));
  51.         HFileOutputFormat.configureIncrementalLoad(job,
  52.                 ConnectionUtil.getTable());
  53.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  54.     }
  55. }
复制代码


生成HFile程序说明:

①. 最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。

②. 最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。

③. MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件。

④. MR例子中HFileOutputFormat.configureIncrementalLoad(job, table);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到HBase的时候,作为一个整体的Region,key是绝对有序的。

⑤. MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。

2. HFile入库到HBase
  1. package zl.hbase.bulkload;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
  4. import org.apache.hadoop.util.GenericOptionsParser;
  5. import zl.hbase.util.ConnectionUtil;
  6. public class HFileLoader {
  7.     public static void main(String[] args) throws Exception {
  8.         String[] dfsArgs = new GenericOptionsParser(
  9.                 ConnectionUtil.getConfiguration(), args).getRemainingArgs();
  10.         LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
  11.                 ConnectionUtil.getConfiguration());
  12.         loader.doBulkLoad(new Path(dfsArgs[0]), ConnectionUtil.getTable());
  13.     }
  14. }
复制代码

通过HBase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库






已有(13)人评论

跳转到指定楼层
zhanggl 发表于 2014-8-24 20:05:34
楼主你的 这个包下面的 import zl.hbase.util.ConnectionUtil;  ConnectionUtil类呢
回复

使用道具 举报

leeworld88 发表于 2014-9-5 11:33:08
谢谢 大牛 。 好像即使表里有数据 也还是可以用HFile bulk load 方法。
回复

使用道具 举报

buildhappy 发表于 2014-10-20 09:59:49
你用的HBase是什么版本的啊?
我的HBase0.98.5在调用context.write()时,提示错误信息说Put类或KeyValue类没有实现Writable接口。
我查了一下Hbase0.98的api,Put类确实没有实现Writable接口。0.94中的Put类就实现了Writable接口。

请问这种情况该肿么破啊?谢谢
回复

使用道具 举报

sstutu 发表于 2014-10-20 10:19:26
buildhappy 发表于 2014-10-20 09:59
你用的HBase是什么版本的啊?
我的HBase0.98.5在调用context.write()时,提示错误信息说Put类或KeyValue类 ...

这里有有hadoop相关包,你hadoop的也导入进来
hadoop匹配版本查看
hadoop、hbase、hive版本对应关系查找
回复

使用道具 举报

buildhappy 发表于 2014-10-20 10:55:41
sstutu 发表于 2014-10-20 10:19
这里有有hadoop相关包,你hadoop的也导入进来
hadoop匹配版本查看
hadoop、hbase、hive版本对应关系查 ...

我的意思是新版本的Hbase对Put类和KeyValue类的定义做了改变,二者都没有实现Writable接口,而Context类中的write方法参数要求为:write(ImmutableBytesWritable key, Writable value),显然例子中的Line39调用write方法不太合理啊!
我的集群式HBase0.98.5+Hadoop1.2.1没有不兼容问题(Pig2同学的"hadoop、hbase、hive版本对应关系查找表"帖子中也提到该组合)


不知道问题描述清楚没有~~~~


我看到的HBase0.95源码(maven repository提供的最早的HBase版本)其中的Put也没有实现Writable接口。


我在程序中自己定义了一个Put类,实现了Writable接口,以上程序可以跑的通了。



是不是现在用Mapreduce从文件中读取数据存到HBase中,要用其他的方法啊?
PS:类似的问题在《HBase权威指南中文版》的P286中也遇到过。


回复

使用道具 举报

sstutu 发表于 2014-10-20 18:37:40
buildhappy 发表于 2014-10-20 10:55
我的意思是新版本的Hbase对Put类和KeyValue类的定义做了改变,二者都没有实现Writable接口,而Context类 ...
这个解决方式有很多种:

解决方法1:引入hadoop包
Hadoop自带一系列有用的Writable实现
26424461_1.png




解决方法2:
hadoop:hadoop-2.2.0
hbase:hbase-0.96.0
1.org.apache.hadoop.hbase.client.Put
    <1>取消了无参的构造方法
    <2>Put类不再继承Writable类     
        0.94.6时public class Put extends Mutation implements HeapSize, Writable, Comparable<Row>
        0.96.0时public class Put extends Mutation implements HeapSize, Comparable<Row>
解决方法:
        由public class MonthUserLoginTimeIndexReducer extends Reducer<BytesWritable,MonthUserLoginTimeIndexWritable, ImmutableBytesWritable, Writable> {
改public class MonthUserLoginTimeIndexReducer extends Reducer<BytesWritable,MonthUserLoginTimeIndexWritable, ImmutableBytesWritable, Put> {


解决方法3

通过自定义Writable




回复

使用道具 举报

buildhappy 发表于 2014-10-20 19:11:29
恩  万分感谢  
但是第一种方法还是不太明白,能稍微详细的介绍一下嘛?谢谢
回复

使用道具 举报

buildhappy 发表于 2014-10-20 19:12:58
sstutu 发表于 2014-10-20 18:37
这个解决方式有很多种:

解决方法1:引入hadoop包

恩  万分感谢  
但是第一种方法还是不太明白,能稍微详细的介绍一下嘛?谢谢

回复

使用道具 举报

sstutu 发表于 2014-10-20 19:41:36
自定义和系统都是差不多的
在运行的时候,系统会去自动找
可以这样想,hadoop的类,其实也是作者写的,自己定义的,只不过是对作者原先类的修改。

可以把引入的hadoop类,当作是自己写的类
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条