分享

hbase MapReduce程序样例介绍及入门

pig2 2014-8-8 19:21:41 发表于 介绍解说 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 18548

问题导读
1.hbase作为数据读取源和输出源样例模板-包含那些过程?
2.通过下例你认为hdfs读入hbase有几种方式?
3.从hbase中的表作为数据源读取,hdfs作为数据输出,你认为的思路是什么?







1、先看一个标准的hbase作为数据读取源和输出源的样例:
  1. Configuration conf = HBaseConfiguration.create();
  2. Job job = new Job(conf, "job name ");
  3. job.setJarByClass(test.class);
  4. Scan scan = new Scan();
  5. TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,Writable.class, Writable.class, job);
  6. TableMapReduceUtil.initTableReducerJob(outputTable, reducer.class, job);
  7. job.waitForCompletion(true);
复制代码

首先创建配置信息和作业对象,设置作业的类。这些和正常的mapreduce一样,唯一不一样的就是数据源的说明部分,TableMapReduceUtil的initTableMapperJob和initTableReducerJob方法来实现。

用如上代码:
数据输入源是hbase的inputTable表,执行mapper.class进行map过程,输出的key/value类型是 ImmutableBytesWritable和Put类型,最后一个参数是作业对象。需要指出的是需要声明一个扫描读入对象scan,进行表扫描读取数 据用,其中scan可以配置参数,这里为了例子简单不再详述。

数据输出目标是hbase的outputTable表,输出执行的reduce过程是reducer.class类,操作的作业目标是job。与map比 缺少输出类型的标注,因为他们不是必要的,看过源代码就知道mapreduce的TableRecordWriter中write(key,value) 方法中,key值是没有用到的。value只能是Put或者Delete两种类型,write方法会自行判断并不用用户指明。

接下来就是mapper类:

  1. public class mapper extends
  2.                 TableMapper<KEYOUT, VALUEOUT> {
  3.         public void map(Writable key, Writable value, Context context)
  4.                         throws IOException, InterruptedException {
  5.                         //mapper逻辑
  6.                         context.write(key, value);
  7.                 }
  8.         }
  9. }
复制代码

继承的是hbase中提供的TableMapper类,其实这个类也是继承的MapReduce类。后边跟的两个泛型参数指定类型是mapper输 出的数据类型,该类型必须继承自Writable类,例如可能用到的put和delete就可以。需要注意的是要和initTableMapperJob 方法指定的数据类型一直。该过程会自动从指定hbase表内一行一行读取数据进行处理。

然后reducer类:

  1. public class countUniteRedcuer extends
  2.                 TableReducer<KEYIN, VALUEIN, KEYOUT> {
  3.         public void reduce(Text key, Iterable<VALUEIN> values, Context context)
  4.                         throws IOException, InterruptedException {
  5.                 //reducer逻辑
  6.                 context.write(null, put or delete);
  7.         }
  8. }
复制代码

reducer继承的是TableReducer类。后边指定三个泛型参数,前两个必须对应map过程的输出key/value类型,第三个必须是 put或者delete。write的时候可以把key写null,它是不必要的。这样reducer输出的数据会自动插入outputTable指定的 表内。

2、有时候我们需要数据源是hdfs的文本,输出对象是hbase。这时候变化也很简单:

  1. Configuration conf = HBaseConfiguration.create();
  2. Job job = new Job(conf, "job name ");
  3. job.setJarByClass(test.class);
  4. job.setMapperClass(mapper.class);
  5. job.setMapOutputKeyClass(Text.class);
  6. job.setMapOutputValueClass(LongWritable.class);
  7. FileInputFormat.setInputPaths(job, path);
  8. TableMapReduceUtil.initTableReducerJob(tableName,reducer.class, job);
复制代码

你会发现只需要像平常的mapreduce的作业声明过程一样,指定mapper的执行类和输出key/value类型,指定 FileInputFormat.setInputPaths的数据源路径,输出声明不变。便完成了从hdfs文本读取数据输出到hbase的命令声明过 程。 mapper和reducer如下:
  1. public class mapper extends Mapper<LongWritable,Writable,Writable,Writable> {
  2.         public void map(LongWritable key, Text line, Context context) {
  3.                  //mapper逻辑
  4.                  context.write(k, one);
  5.         }
  6. }
  7. public class redcuer extends
  8.                 TableReducer<KEYIN, VALUEIN, KEYOUT> {
  9.         public void reduce(Writable key, Iterable<Writable> values, Context context)
  10.                         throws IOException, InterruptedException {
  11.                 //reducer逻辑
  12.                 context.write(null, put or delete);
  13.         }
  14. }
复制代码

mapper还依旧继承原来的MapReduce类中的Mapper即可。同样注意这前后数据类型的key/value一直性。

3、最后就是从hbase中的表作为数据源读取,hdfs作为数据输出,简单的如下:

  1. Configuration conf = HBaseConfiguration.create();
  2. Job job = new Job(conf, "job name ");
  3. job.setJarByClass(test.class);
  4. Scan scan = new Scan();
  5. TableMapReduceUtil.initTableMapperJob(inputTable, scan, mapper.class,
  6.                 Writable.class, Writable.class, job);
  7. job.setOutputKeyClass(Writable.class);
  8. job.setOutputValueClass(Writable.class);
  9. FileOutputFormat.setOutputPath(job, Path);
  10. job.waitForCompletion(true);
复制代码

mapper和reducer简单如下:
  1. public class mapper extends
  2.                 TableMapper<KEYOUT, VALUEOUT>{
  3.         public void map(Writable key, Writable value, Context context)
  4.                         throws IOException, InterruptedException {
  5.                         //mapper逻辑
  6.                         context.write(key, value);
  7.                 }
  8.         }
  9. }
  10. public class reducer extends
  11.                 Reducer<Writable,Writable,Writable,Writable>  {
  12.         public void reducer(Writable key, Writable value, Context context)
  13.                         throws IOException, InterruptedException {
  14.                         //reducer逻辑
  15.                         context.write(key, value);
  16.                 }
  17.         }
  18. }
复制代码



最后说一下TableMapper和TableReducer的本质,其实这俩类就是为了简化一下书写代码,因为传入的4个泛型参数里都会有固定的参数类型,所以是Mapper和Reducer的简化版本,本质他们没有任何区别。源码如下:
  1. public abstract class TableMapper<KEYOUT, VALUEOUT>
  2. extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
  3. }
  4. public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
  5. extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {
  6. }
复制代码




已有(6)人评论

跳转到指定楼层
break-spark 发表于 2014-10-14 14:02:25
没有看太明白,以后再看
回复

使用道具 举报

ruguoaaa 发表于 2014-11-7 18:12:10
谢谢楼主,收藏了
回复

使用道具 举报

mingge12321 发表于 2014-11-8 22:11:07
好好,不错的资料
回复

使用道具 举报

jxlhljh 发表于 2014-12-9 15:31:51
好好学习天天向上
回复

使用道具 举报

EASONLIU 发表于 2014-12-17 10:25:31
路过,学习学习
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条