立即注册 登录
About云-梭伦科技 返回首页

sunshine_junge的个人空间 https://www.aboutyun.com/?3779 [收藏] [复制] [分享] [RSS]

日志

Hbase建索引分析

已有 2744 次阅读2014-8-1 21:14


1. Hbase的一个例子,IndexBuilder.建索引.代码及解析如下: 
Java代码  
  1. /** 
  2.  * Copyright 2009 The Apache Software Foundation 
  3.  * 
  4.  * Licensed to the Apache Software Foundation (ASF) under one 
  5.  * or more contributor license agreements.  See the NOTICE file 
  6.  * distributed with this work for additional information 
  7.  * regarding copyright ownership.  The ASF licenses this file 
  8.  * to you under the Apache License, Version 2.0 (the 
  9.  * "License"); you may not use this file except in compliance 
  10.  * with the License.  You may obtain a copy of the License at 
  11.  * 
  12.  *     http://www.apache.org/licenses/LICENSE-2.0 
  13.  * 
  14.  * Unless required by applicable law or agreed to in writing, software 
  15.  * distributed under the License is distributed on an "AS IS" BASIS, 
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
  17.  * See the License for the specific language governing permissions and 
  18.  * limitations under the License. 
  19.  */  
  20. package org.frame.base.hbase.hadoop.hbase;  
  21.   
  22. import java.io.ByteArrayOutputStream;  
  23. import java.io.DataOutputStream;  
  24. import java.io.IOException;  
  25. import java.util.HashMap;  
  26.   
  27. import org.apache.hadoop.conf.Configuration;  
  28. import org.apache.hadoop.hbase.HBaseConfiguration;  
  29. import org.apache.hadoop.hbase.client.Put;  
  30. import org.apache.hadoop.hbase.client.Result;  
  31. import org.apache.hadoop.hbase.client.Scan;  
  32. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  33. import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;  
  34. import org.apache.hadoop.hbase.mapreduce.TableInputFormat;  
  35. import org.apache.hadoop.hbase.util.Base64;  
  36. import org.apache.hadoop.hbase.util.Bytes;  
  37. import org.apache.hadoop.io.Writable;  
  38. import org.apache.hadoop.mapreduce.Job;  
  39. import org.apache.hadoop.mapreduce.Mapper;  
  40. import org.apache.hadoop.util.GenericOptionsParser;  
  41.   
  42. /** 
  43.  * 基本列值,创建索引. 
  44.  */  
  45.   
  46. /** 
  47.  * Example map/reduce job to construct index tables that can be used to quickly 
  48.  * find a row based on the value of a column. It demonstrates: 
  49.  * [list] 
  50.  * <li>Using TableInputFormat and TableMapReduceUtil to use an HTable as input 
  51.  * to a map/reduce job.
  52.  
  53.  * [*]Passing values from main method to children via the configuration. 
  54.  
  55.  * <li>Using MultiTableOutputFormat to output to multiple tables from a 
  56.  * map/reduce job.
  57.  
  58.  * [*]A real use case of building a secondary index over a table. 
  59.  
  60.  * [/list] 
  61.  * 
  62.  * <h3>Usage</h3> 
  63.  * 
  64.  *  
  65.  * Modify ${HADOOP_HOME}/conf/hadoop-env.sh to include the hbase jar, the 
  66.  * zookeeper jar, the examples output directory, and the hbase conf directory in 
  67.  * HADOOP_CLASSPATH, and then run 
  68.  * <tt>[b]bin/hadoop org.apache.hadoop.hbase.mapreduce.IndexBuilder TABLE_NAME COLUMN_FAMILY ATTR [ATTR ...][/b]</tt> 
  69.  *  
  70.  
  71.  * 
  72.  *  
  73.  * To run with the sample data provided in index-builder-setup.rb, use the 
  74.  * arguments [b]<tt>people attributes name email phone</tt>[/b]. 
  75.  *  
  76.  
  77.  * 
  78.  *  
  79.  * This code was written against HBase 0.21 trunk. 
  80.  *  
  81.  
  82.  */  
  83. public class IndexBuilder {  
  84.   /** the column family containing the indexed row key */  
  85.   public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");  
  86.   /** the qualifier containing the indexed row key */  
  87.   public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");  
  88.   
  89.   /** 
  90.    * Internal Mapper to be run by Hadoop. 
  91.    */  
  92.   public static class Map extends  
  93.       Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {  
  94.     private byte[] family;  
  95.     private HashMap<byte[], ImmutableBytesWritable> indexes;  
  96.   
  97.     @Override  
  98.     protected void map(ImmutableBytesWritable rowKey, Result result, Context context)  
  99.         throws IOException, InterruptedException {  
  100.       for(java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {  
  101.         byte[] qualifier = index.getKey();  
  102.         ImmutableBytesWritable tableName = index.getValue();  
  103.         byte[] value = result.getValue(family, qualifier);  
  104.         if (value != null) {  
  105.           // original: row 123 attribute:phone 555-1212  
  106.           // index: row 555-1212 INDEX:ROW 123  
  107.           Put put = new Put(value);  
  108.           put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());  
  109.           context.write(tableName, put);  
  110.         }  
  111.       }  
  112.     }  
  113.   
  114.     @Override  
  115.     protected void setup(Context context) throws IOException,  
  116.         InterruptedException {  
  117.       Configuration configuration = context.getConfiguration();  
  118.       String tableName = configuration.get("index.tablename");  
  119.       String[] fields = configuration.getStrings("index.fields");  
  120.       String familyName = configuration.get("index.familyname");  
  121.       family = Bytes.toBytes(familyName);  
  122.       indexes = new HashMap<byte[], ImmutableBytesWritable>();  
  123.       for(String field : fields) {  
  124.         // if the table is "people" and the field to index is "email", then the  
  125.         // index table will be called "people-email"  
  126.         indexes.put(Bytes.toBytes(field),  
  127.             new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));  
  128.       }  
  129.     }  
  130.   }  
  131.   
  132.   /** 
  133.    * Job configuration. 
  134.    */  
  135.   public static Job configureJob(Configuration conf, String [] args)  
  136.   throws IOException {  
  137.     String tableName = args[0];  
  138.     String columnFamily = args[1];  
  139.     System.out.println("****" + tableName);  
  140.     conf.set(TableInputFormat.SCAN, convertScanToString(new Scan()));  
  141.     conf.set(TableInputFormat.INPUT_TABLE, tableName);  
  142.     conf.set("index.tablename", tableName);  
  143.     conf.set("index.familyname", columnFamily);  
  144.     String[] fields = new String[args.length - 2];  
  145.     for(int i = 0; i < fields.length; i++) {  
  146.       fields[i] = args[i + 2];  
  147.     }  
  148.     conf.setStrings("index.fields", fields);  
  149.     //conf.set("index.familyname", "attributes");  
  150.     Job job = new Job(conf, tableName);  
  151.     job.setJarByClass(IndexBuilder.class);  
  152.     job.setMapperClass(Map.class);  
  153.     job.setNumReduceTasks(0);//mapred.reduce.tasks,设置为0表示不需要reduce.  
  154.     job.setInputFormatClass(TableInputFormat.class);  
  155.     job.setOutputFormatClass(MultiTableOutputFormat.class);  
  156.     return job;  
  157.   }  
  158.   
  159.   /** 
  160.    * 把scan转为String类型 
  161.    * @param scan 
  162.    * @return 
  163.    * @throws IOException 
  164.    */  
  165.   private static String convertScanToString(Scan scan) throws IOException{  
  166.       ByteArrayOutputStream out = new ByteArrayOutputStream();  
  167.       DataOutputStream dos = new DataOutputStream(out);  
  168.       scan.write(dos);  
  169.       return Base64.encodeBytes(out.toByteArray());  
  170.   }  
  171.   
  172.   public static void main(String[] args) throws Exception {  
  173.     Configuration conf = HBaseConfiguration.create();  
  174.     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  175.     if(otherArgs.length < 3) {  
  176.       System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3");  
  177.       System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");  
  178.       System.exit(-1);  
  179.     }  
  180.     Job job = configureJob(conf, otherArgs);  
  181.     System.exit(job.waitForCompletion(true) ? 0 : 1);  
  182.   }  
  183. }  

首先需要在Hbase里面建一张表,然后放入一点测试数据,数据结构如下: 
Java代码  
  1. /** 
  2.    原始表 
  3.  * table1 
  4.  *          family1:column1   family1:column2 
  5.  * key1          value1          value11 
  6.  * key2          value2          value22 
  7.  * 
  8.    索引表 
  9.  * table1-column1 
  10.  * 
  11.  *          INDEX:ROW 
  12.  * value1      key1 
  13.  * value2      key2 
  14.  * 
  15.  */  

这是我们预想的,执行命令: 
Java代码  
  1. bin/hadoop IndexBuilder.jar IndexBuilder 'table1' 'family1' 'column1'  

指定参数执行建立Hbase索引表,当然这是一对一的索引建立方法,如果column1可能为相同的,就不能这么建索引了。 
不过可以采用: 
Java代码  
  1. value1_key1  empty  
  2. value2_key2  empty  

采用前缀的方式,这样Hbase进行搜索的时候,先根据value1得到他的key列表,然后再通过key列表搜索值. 


 



引用:http://a123159521.iteye.com/blog/1239426

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条