分享

HBase 5种写入数据方式

本帖最后由 howtodown 于 2014-4-20 14:04 编辑
问题导读:
1.如何直接使用HTable进行导入?
2.如何从HDFS文件导入HBase,继承自Mapper?
3.如何读取HBase表写入HBase表中字段?
4.如何让MR和HTable结合?






Version :hadoop1.2.1; hbaes0.94.16;

HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:

1. 直接使用HTable进行导入,代码如下:

  1. package hbase.curd;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Random;
  6. import org.apache.hadoop.hbase.client.HTable;
  7. import org.apache.hadoop.hbase.client.Put;
  8. import org.apache.hadoop.hbase.util.Bytes;
  9. public class PutExample {
  10.         /**
  11.          * @param args
  12.          * @throws IOException
  13.          */
  14.         private HTable  table = HTableUtil.getHTable("testtable");
  15.         public static void main(String[] args) throws IOException {
  16.                 // TODO Auto-generated method stub
  17.                 PutExample pe = new PutExample();
  18.                 pe.putRows();
  19.                
  20.         }
  21.         
  22.         public void putRows(){
  23.                 List<Put> puts = new ArrayList<Put>();
  24.                 for(int i=0;i<10;i++){
  25.                         Put put = new Put(Bytes.toBytes("row_"+i));
  26.                         Random random = new Random();
  27.                         
  28.                         if(random.nextBoolean()){
  29.                                 put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i));
  30.                         }
  31.                         if(random.nextBoolean()){
  32.                                 put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i));
  33.                         }
  34.                         if(random.nextBoolean()){
  35.                                 put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i));
  36.                         }
  37.                         if(random.nextBoolean()){
  38.                                 put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i));        
  39.                         }
  40.                         if(random.nextBoolean()){
  41.                                 put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i));
  42.                         }
  43.                         puts.add(put);
  44.                 }
  45.                 try{
  46.                         table.put(puts);
  47.                         table.close();
  48.                 }catch(Exception e){
  49.                         e.printStackTrace();
  50.                         return ;
  51.                 }
  52.                 System.out.println("done put rows");
  53.         }
  54. }
复制代码

其中HTableUtil如下:
  1. package hbase.curd;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hbase.HBaseConfiguration;
  5. import org.apache.hadoop.hbase.client.HTable;
  6. import org.apache.hadoop.hbase.util.Bytes;
  7. public class HTableUtil {
  8.         private static HTable table;
  9.         private static Configuration conf;
  10.         
  11.         static{
  12.                 conf =HBaseConfiguration.create();
  13.                 conf.set("mapred.job.tracker", "hbase:9001");
  14.                 conf.set("fs.default.name", "hbase:9000");
  15.                 conf.set("hbase.zookeeper.quorum", "hbase");
  16.                 try {
  17.                         table = new HTable(conf,"testtable");
  18.                 } catch (IOException e) {
  19.                         // TODO Auto-generated catch block
  20.                         e.printStackTrace();
  21.                 }
  22.         }
  23.         public static Configuration getConf(){
  24.                 return conf;
  25.         }
  26.         public static HTable getHTable(String tablename){
  27.                 if(table==null){
  28.                         try {
  29.                                 table= new HTable(conf,tablename);
  30.                         } catch (IOException e) {
  31.                                 // TODO Auto-generated catch block
  32.                                 e.printStackTrace();
  33.                         }
  34.                 }
  35.                 return table;
  36.         }
  37.         
  38.         public static  byte[] gB(String name){
  39.                 return Bytes.toBytes(name);
  40.         }
  41. }
复制代码

这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。

2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:

  1. package hbase.mr;
  2. import java.io.IOException;
  3. import hbase.curd.HTableUtil;
  4. import org.apache.commons.cli.CommandLine;
  5. import org.apache.commons.cli.CommandLineParser;
  6. import org.apache.commons.cli.HelpFormatter;
  7. import org.apache.commons.cli.Option;
  8. import org.apache.commons.cli.Options;
  9. import org.apache.commons.cli.PosixParser;
  10. import org.apache.commons.codec.digest.DigestUtils;
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.Path;
  13. import org.apache.hadoop.hbase.KeyValue;
  14. import org.apache.hadoop.hbase.client.Put;
  15. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  16. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
  17. import org.apache.hadoop.hbase.util.Bytes;
  18. import org.apache.hadoop.io.LongWritable;
  19. import org.apache.hadoop.io.Text;
  20. import org.apache.hadoop.io.Writable;
  21. import org.apache.hadoop.mapreduce.Job;
  22. import org.apache.hadoop.mapreduce.Mapper;
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  24. import org.apache.hadoop.util.GenericOptionsParser;
  25. public class ImportFromFile {
  26.         /**
  27.          * 从文件导入到HBase
  28.          * @param args
  29.          */
  30.         public static final String NAME="ImportFromFile";
  31.         public enum Counters{LINES}
  32.         
  33.         static class ImportMapper extends Mapper<LongWritable,Text,
  34.                 ImmutableBytesWritable,Writable>{
  35.                 private byte[] family =null;
  36.                 private byte[] qualifier = null;
  37.                 @Override
  38.                 protected void setup(Context cxt){
  39.                         String column = cxt.getConfiguration().get("conf.column");
  40.                         byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
  41.                         family = colkey[0];
  42.                         if(colkey.length>1){
  43.                                 qualifier = colkey[1];
  44.                         }
  45.                 }
  46.                 @Override
  47.                 public void map(LongWritable offset,Text line,Context cxt){
  48.                         try{
  49.                                 String lineString= line.toString();
  50.                                 byte[] rowkey= DigestUtils.md5(lineString);
  51.                                 Put put = new Put(rowkey);
  52.                                 put.add(family,qualifier,Bytes.toBytes(lineString));
  53.                                 cxt.write(new ImmutableBytesWritable(rowkey), put);
  54.                                 cxt.getCounter(Counters.LINES).increment(1);
  55.                         }catch(Exception e){
  56.                                 e.printStackTrace();
  57.                         }
  58.                 }
  59.         }
  60.         private static CommandLine parseArgs(String[] args){
  61.                 Options options = new Options();
  62.                 Option o = new Option("t" ,"table",true,"table to import into (must exist)");
  63.                 o.setArgName("table-name");
  64.                 o.setRequired(true);
  65.                 options.addOption(o);
  66.                
  67.                 o= new Option("c","column",true,"column to store row data into");
  68.                 o.setArgName("family:qualifier");
  69.                 o.setRequired(true);
  70.                 options.addOption(o);
  71.                
  72.                 o = new Option("i", "input", true,
  73.                 "the directory or file to read from");
  74.                 o.setArgName("path-in-HDFS");
  75.                 o.setRequired(true);
  76.                 options.addOption(o);
  77.                 options.addOption("d", "debug", false, "switch on DEBUG log level");
  78.                 CommandLineParser parser = new PosixParser();
  79.                 CommandLine cmd = null;
  80.                 try {
  81.                         cmd = parser.parse(options, args);
  82.                 } catch (Exception e) {
  83.                         System.err.println("ERROR: " + e.getMessage() + "\n");
  84.                         HelpFormatter formatter = new HelpFormatter();
  85.                         formatter.printHelp(NAME + " ", options, true);
  86.                         System.exit(-1);
  87.                 }
  88.                 return cmd;
  89.         }
  90.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  91.                
  92.                 Configuration conf = HTableUtil.getConf();
  93.                 String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();
  94.                 CommandLine cmd = parseArgs(otherArgs);
  95.                 String table = cmd.getOptionValue("t");
  96.                 String input = cmd.getOptionValue("i");
  97.                 String column = cmd.getOptionValue("c");
  98.                 conf.set("conf.column", column);
  99.                 Job job = new Job(conf, "Import from file " + input + " into table " + table);
  100.                 job.setJarByClass(ImportFromFile.class);
  101.                 job.setMapperClass(ImportMapper.class);
  102.                 job.setOutputFormatClass(TableOutputFormat.class);
  103.                 job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
  104.                 job.setOutputKeyClass(ImmutableBytesWritable.class);
  105.                 job.setOutputValueClass(Writable.class);
  106.                 job.setNumReduceTasks(0);
  107.                 FileInputFormat.addInputPath(job, new Path(input));
  108.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  109.         }
  110.         
  111.         private static String[] initialArg(){
  112.                 String []args = new String[6];
  113.                 args[0]="-c";
  114.                 args[1]="fam:data";
  115.                 args[2]="-i";
  116.                 args[3]="/user/hadoop/input/picdata";
  117.                 args[4]="-t";
  118.                 args[5]="testtable";
  119.                 return args;
  120.         }
  121. }
复制代码

2.2 读取HBase表写入HBase表中字段,代码如下:
  1. package hbase.mr;
  2. import hadoop.util.HadoopUtils;
  3. import java.io.IOException;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.KeyValue;
  6. import org.apache.hadoop.hbase.client.Put;
  7. import org.apache.hadoop.hbase.client.Result;
  8. import org.apache.hadoop.hbase.client.Scan;
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  10. import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
  11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  12. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  13. import org.apache.hadoop.hbase.util.Bytes;
  14. import org.apache.hadoop.io.Writable;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. public class ParseDriver {
  19.         /**
  20.          * 把hbase表中数据拷贝到其他表(或本表)相同字段
  21.          * @param args
  22.          */
  23.         enum Counters{
  24.                 VALID, ROWS, COLS, ERROR
  25.         }
  26.         private static Logger log = LoggerFactory.getLogger(ParseDriver.class);
  27.         static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
  28.                 private byte[] columnFamily =null ;
  29.                 private byte[] columnQualifier =null;
  30.                 @Override
  31.                 protected void setup(Context cxt){
  32.                         columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
  33.                         columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
  34.                 }
  35.                 @Override
  36.                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){
  37.                         cxt.getCounter(Counters.ROWS).increment(1);
  38.                         String value =null;
  39.                         try{
  40.                                 Put put = new Put(row.get());
  41.                                 for(KeyValue kv : columns.list()){
  42.                                         cxt.getCounter(Counters.COLS).increment(1);
  43.                                         value= Bytes.toStringBinary(kv.getValue());
  44.                                         if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
  45.                                                 put.add(columnFamily,columnQualifier,kv.getValue());
  46.                                                 cxt.write(row, put);
  47.                                                 cxt.getCounter(Counters.VALID).increment(1);
  48.                                         }
  49.                                 }
  50.                         }catch(Exception e){
  51.                                 log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
  52.                                                 ",Value:"+value);
  53.                                 cxt.getCounter(Counters.ERROR).increment(1);
  54.                         }
  55.                 }
  56.                 private boolean equals(byte[] a,byte[] b){
  57.                         String aStr= Bytes.toString(a);
  58.                         String bStr= Bytes.toString(b);
  59.                         if(aStr.equals(bStr)){
  60.                                 return true;
  61.                         }
  62.                         return false;
  63.                 }
  64.         }
  65.         
  66.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  67.                 byte[] columnFamily = Bytes.toBytes("fam");
  68.                 byte[] columnQualifier = Bytes.toBytes("data");
  69.                 Scan scan = new Scan ();
  70.                 scan.addColumn(columnFamily, columnQualifier);
  71.                 HadoopUtils.initialConf("hbase");
  72.                 Configuration conf = HadoopUtils.getConf();
  73.                 conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
  74.                 conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
  75.                
  76.                 String input ="testtable" ;//
  77.                 String output="testtable1"; //
  78.                                 
  79.                 Job job = new Job(conf,"Parse data in "+input+",write to"+output);
  80.                 job.setJarByClass(ParseDriver.class);
  81.                 TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
  82.                                 ImmutableBytesWritable.class, Put.class,job);
  83.                 TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
  84.                
  85.                 System.exit(job.waitForCompletion(true)?0:1);
  86.                
  87.         }
  88. }
复制代码
其中HadoopUtils代码如下:
  1. package hadoop.util;
  2. import java.io.IOException;
  3. import java.net.URI;
  4. import java.util.ArrayList;
  5. import java.util.List;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.FSDataInputStream;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.util.LineReader;
  12. public class HadoopUtils {
  13.         private static Configuration conf;
  14.         public  static void initialConf(){
  15.                 conf = new Configuration();
  16.                 conf.set("mapred.job.tracker", "hbase:9001");
  17.                 conf.set("fs.default.name", "hbase:9000");
  18.                 conf.set("hbase.zookeeper.quorum", "hbase");
  19.         }
  20.         public  static void initialConf(String host){
  21.                 conf = new Configuration();
  22.                 conf.set("mapred.job.tracker", host+":9001");
  23.                 conf.set("fs.default.name", host+":9000");
  24.                 conf.set("hbase.zookeeper.quorum", host);
  25.         }
  26.         public static Configuration getConf(){
  27.                 if(conf==null){
  28.                         initialConf();
  29.                 }
  30.                 return conf;
  31.         }
  32.         
  33.         public static List<String> readFromHDFS(String fileName) throws IOException {
  34.                 Configuration conf = getConf();
  35.                 FileSystem fs = FileSystem.get(URI.create(fileName), conf);
  36.                 FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
  37.                 // 按行读取(新版本的方法)
  38.                 LineReader inLine = new LineReader(hdfsInStream, conf);
  39.                 Text txtLine = new Text();
  40.                
  41.                 int iResult = inLine.readLine(txtLine); //读取第一行
  42.                 List<String> list = new ArrayList<String>();
  43.                 while (iResult > 0 ) {
  44.                         list.add(txtLine.toString());
  45.                         iResult = inLine.readLine(txtLine);
  46.                 }
  47.                
  48.                 hdfsInStream.close();
  49.                 fs.close();
  50.                 return list;
  51.         }
  52. }
复制代码

2.3 MR和HTable结合,代码如下:
  1. package hbase.mr;
  2. import hadoop.util.HadoopUtils;
  3. import hbase.mr.AnalyzeDriver.Counters;
  4. import java.io.IOException;
  5. import java.util.Date;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.hbase.KeyValue;
  8. import org.apache.hadoop.hbase.client.HTable;
  9. import org.apache.hadoop.hbase.client.Put;
  10. import org.apache.hadoop.hbase.client.Result;
  11. import org.apache.hadoop.hbase.client.Scan;
  12. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  13. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  14. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  15. import org.apache.hadoop.hbase.util.Bytes;
  16. import org.apache.hadoop.io.Writable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. public class ParseSinglePutDriver {
  22.         /**
  23.          * 使用HTable进行写入
  24.          * 把infoTable 表中的 qualifier字段复制到qualifier1字段
  25.          * 单个Put
  26.          * @param args
  27.          */
  28.         private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
  29.         static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
  30.                 private HTable infoTable =null ;
  31.                 private byte[] columnFamily =null ;
  32.                 private byte[] columnQualifier =null;
  33.                 private byte[] columnQualifier1 =null;
  34.                 @Override
  35.                 protected void setup(Context cxt){
  36.                         log.info("ParseSinglePutDriver setup,current time: "+new Date());
  37.                         try {
  38.                                 infoTable = new HTable(cxt.getConfiguration(),
  39.                                                 cxt.getConfiguration().get("conf.infotable"));
  40.                                 infoTable.setAutoFlush(false);
  41.                         } catch (IOException e) {
  42.                                 log.error("Initial infoTable error:\n"+e.getMessage());
  43.                         }
  44.                         columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
  45.                         columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
  46.                         columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
  47.                 }
  48.                 @Override
  49.                 protected void cleanup(Context cxt){
  50.                         try {
  51.                                 infoTable.flushCommits();
  52.                                 log.info("ParseSinglePutDriver cleanup ,current time :"+new Date());
  53.                         } catch (IOException e) {
  54.                                 log.error("infoTable flush commits error:\n"+e.getMessage());
  55.                         }
  56.                 }
  57.                 @Override
  58.                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){
  59.                         cxt.getCounter(Counters.ROWS).increment(1);
  60.                         String value =null ;
  61.                         try{
  62.                                 Put put = new Put(row.get());
  63.                                 for(KeyValue kv : columns.list()){
  64.                                         cxt.getCounter(Counters.COLS).increment(1);
  65.                                         value= Bytes.toStringBinary(kv.getValue());
  66.                                         if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
  67.                                                 put.add(columnFamily,columnQualifier1,kv.getValue());
  68.                                                 infoTable.put(put);
  69.                                         }
  70.                                 }
  71.                         }catch(Exception e){
  72.                                 log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
  73.                                                 ",Value:"+value);
  74.                                 cxt.getCounter(Counters.ERROR).increment(1);
  75.                         }
  76.                 }
  77.                 private boolean equals(byte[] a,byte[] b){
  78.                         String aStr= Bytes.toString(a);
  79.                         String bStr= Bytes.toString(b);
  80.                         if(aStr.equals(bStr)){
  81.                                 return true;
  82.                         }
  83.                         return false;
  84.                 }
  85.         }
  86.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  87.                 String input ="testtable";
  88.                 byte[] columnFamily = Bytes.toBytes("fam");
  89.                 byte[] columnQualifier = Bytes.toBytes("data");
  90.                 byte[] columnQualifier1 = Bytes.toBytes("data1");
  91.                 Scan scan = new Scan ();
  92.                 scan.addColumn(columnFamily, columnQualifier);
  93.                 HadoopUtils.initialConf("hbase");
  94.                 Configuration conf = HadoopUtils.getConf();
  95.                 conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
  96.                 conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
  97.                 conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
  98.                 conf.set("conf.infotable", input);
  99.                
  100.                 Job job = new Job(conf,"Parse data in "+input+",into tables");
  101.                 job.setJarByClass(ParseSinglePutDriver.class);
  102.                 TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
  103.                                 ImmutableBytesWritable.class, Put.class,job);        
  104.                 job.setOutputFormatClass(NullOutputFormat.class);
  105.                 job.setNumReduceTasks(0);
  106.                 System.exit(job.waitForCompletion(true)?0:1);
  107.         }
  108. }
复制代码

2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。
  1. package hbase.mr;
  2. import hadoop.util.HadoopUtils;
  3. import hbase.mr.AnalyzeDriver.Counters;
  4. import java.io.IOException;
  5. import java.util.ArrayList;
  6. import java.util.Date;
  7. import java.util.List;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.hbase.KeyValue;
  10. import org.apache.hadoop.hbase.client.HTable;
  11. import org.apache.hadoop.hbase.client.Put;
  12. import org.apache.hadoop.hbase.client.Result;
  13. import org.apache.hadoop.hbase.client.Scan;
  14. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  15. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
  16. import org.apache.hadoop.hbase.mapreduce.TableMapper;
  17. import org.apache.hadoop.hbase.util.Bytes;
  18. import org.apache.hadoop.io.Writable;
  19. import org.apache.hadoop.mapreduce.Job;
  20. import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. public class ParseListPutDriver {
  24.         /**
  25.          * 使用HTable进行写入
  26.          * List <Put> 进行测试,查看效率
  27.          * 把infoTable 表中的 qualifier字段复制到qualifier1字段
  28.          * @param args
  29.          */
  30.         private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
  31.         static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{
  32.                 private HTable infoTable =null ;
  33.                 private byte[] columnFamily =null ;
  34.                 private byte[] columnQualifier =null;
  35.                 private byte[] columnQualifier1 =null;
  36.                 private List<Put> list = new ArrayList<Put>();
  37.                 @Override
  38.                 protected void setup(Context cxt){
  39.                         log.info("ParseListPutDriver setup,current time: "+new Date());
  40.                         try {
  41.                                 infoTable = new HTable(cxt.getConfiguration(),
  42.                                                 cxt.getConfiguration().get("conf.infotable"));
  43.                                 infoTable.setAutoFlush(false);
  44.                         } catch (IOException e) {
  45.                                 log.error("Initial infoTable error:\n"+e.getMessage());
  46.                         }
  47.                         columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));
  48.                         columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));
  49.                         columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));
  50.                 }
  51.                 @Override
  52.                 protected void cleanup(Context cxt){
  53.                         try {
  54.                                 infoTable.put(list);
  55.                                 infoTable.flushCommits();
  56.                                 log.info("ParseListPutDriver cleanup ,current time :"+new Date());
  57.                         } catch (IOException e) {
  58.                                 log.error("infoTable flush commits error:\n"+e.getMessage());
  59.                         }
  60.                 }
  61.                 @Override
  62.                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){
  63.                         cxt.getCounter(Counters.ROWS).increment(1);
  64.                         String value =null ;
  65.                         try{
  66.                                 Put put = new Put(row.get());
  67.                                 for(KeyValue kv : columns.list()){
  68.                                         cxt.getCounter(Counters.COLS).increment(1);
  69.                                         value= Bytes.toStringBinary(kv.getValue());
  70.                                         if(equals(columnQualifier,kv.getQualifier())){  // 过滤column
  71.                                                 put.add(columnFamily,columnQualifier1,kv.getValue());
  72.                                                 list.add(put);
  73.                                         }
  74.                                 }
  75.                         }catch(Exception e){
  76.                                 log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+
  77.                                                 ",Value:"+value);
  78.                                 cxt.getCounter(Counters.ERROR).increment(1);
  79.                         }
  80.                 }
  81.                 private boolean equals(byte[] a,byte[] b){
  82.                         String aStr= Bytes.toString(a);
  83.                         String bStr= Bytes.toString(b);
  84.                         if(aStr.equals(bStr)){
  85.                                 return true;
  86.                         }
  87.                         return false;
  88.                 }
  89.         }
  90.         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  91.                 String input ="testtable";
  92.                 byte[] columnFamily = Bytes.toBytes("fam");
  93.                 byte[] columnQualifier = Bytes.toBytes("data");
  94.                 byte[] columnQualifier1 = Bytes.toBytes("data2");
  95.                 Scan scan = new Scan ();
  96.                 scan.addColumn(columnFamily, columnQualifier);
  97.                 HadoopUtils.initialConf("hbase");
  98.                 Configuration conf = HadoopUtils.getConf();
  99.                 conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));
  100.                 conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));
  101.                 conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));
  102.                 conf.set("conf.infotable", input);
  103.                
  104.                 Job job = new Job(conf,"Parse data in "+input+",into tables");
  105.                 job.setJarByClass(ParseListPutDriver.class);
  106.                 TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,
  107.                                 ImmutableBytesWritable.class, Put.class,job);        
  108.                 job.setOutputFormatClass(NullOutputFormat.class);
  109.                 job.setNumReduceTasks(0);
  110.                 System.exit(job.waitForCompletion(true)?0:1);
  111.         }
  112. }
复制代码

数据记录条数为:26632,可以看到下面图片中的时间记录对比:
1.jpg

2.png

由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下(hbase1.0.jar为编译打包的MR程序):
3.jpg






blog地址:http://blog.csdn.net/fansy1990

已有(8)人评论

跳转到指定楼层
释怀 发表于 2016-7-22 16:12:17
要是能多点注释对初学者来说就完美了
回复

使用道具 举报

天天哥们 发表于 2016-7-24 15:54:44
HBase高级应用实战视频教程—— http:// pan .baidu. com/s/1c2AzvOW 密码: n7vr
回复

使用道具 举报

George-zqq 发表于 2016-12-29 17:47:02
很好,学习学习
回复

使用道具 举报

sunbrillant 发表于 2017-9-1 22:57:35
之前用的都是MR读写,现在发现原来有这么多方式
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条