分享

hbase中自定义Coprocessor

fanbells 2014-3-27 00:24:00 发表于 问题解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 11652
1、为什么要自定义Coprocessor

已有(3)人评论

跳转到指定楼层
fanbells 发表于 2014-3-27 00:26:18
hbase中自带了一个AggregationClient的协处理器,可以进行count、avg等函数,但是感觉不好用,查看了AggregationClient的源码发现在使用是必须指定列簇,而且只指定一个列簇,如果指定多个列簇会报Exception in thread "main" java.io.IOException: There must be only one family,但是在统计是如果有一个列簇下的有些列没有值,统计结果就是0。红色部分就是AggregateImplementation的内部使用规则,只获取第一个列簇。
源码内容:
  1. public <T, S> long getRowNum(ColumnInterpreter<T, S> ci, Scan scan)
  2.   throws IOException {
  3.    long counter = 0l;
  4.    List<KeyValue> results = new ArrayList<KeyValue>();
  5.    byte[] colFamily = scan.getFamilies()[0];
  6.     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();
  7.     if (scan.getFilter() == null && qualifier == null)
  8.       scan.setFilter(new FirstKeyOnlyFilter());
  9.     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
  10.         .getRegion().getScanner(scan);
  11.    try {</font>
  12.     boolean hasMoreRows = false;
  13.      do {</font>
  14.         hasMoreRows = scanner.next(results);
  15.       if (results.size() > 0) {
  16.           counter++;</font>
  17.     }
  18.        results.clear();
  19.       } while (hasMoreRows);
  20.     } finally {
  21.       scanner.close();
  22.     }
  23.    log.info("Row counter from this region is "
  24.        + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
  25.          .getRegionNameAsString() + ": " + counter);
  26.     return counter;
  27.   }
复制代码


因为使用有些不方便,所以介绍一下如何进行自定义协处理器。
hbase中协处理器分两种类型,一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程,现在我介绍的是第二种endpoint。下面是一个统计的例子。
1、定义一个接口CustomProtocol实现CoprocessorProtocol;
    CustomProtocol中可以添加自己需要的方法,比如count方法;

  1. public interface CustomProtocol extends CoprocessorProtocol{
  2.     public long rowCount(Scan scan,Filter filter) throws IOException;
  3. }
复制代码



2、定义一个类CustomProtocolIm继承BaseEndpointCoprocessor,实现CoprocessorProtocol;

  1. public class CustomProtocolIm extends BaseEndpointCoprocessor implements CustomProtocol{
  2.             @Override
  3.             public long rowCount(Scan scan, Filter filter) throws IOException {
  4.                         //在服务端进行统计
  5.                         long counter = 0l;
  6.                List<KeyValue> results = new ArrayList<KeyValue>();
  7.                if (scan.getFilter() == null)
  8.                  scan.setFilter(new FirstKeyOnlyFilter());
  9.                InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
  10.                    .getRegion().getScanner(scan);
  11.                try {
  12.                  boolean hasMoreRows = false;
  13.                  do {
  14.                    hasMoreRows = scanner.next(results);
  15.                    if (results.size() > 0) {
  16.                      counter++;
  17.                    }
  18.                    results.clear();
  19.                  } while (hasMoreRows);
  20.                } finally {
  21.                  scanner.close();
  22.                }
  23.                return counter;
  24.             }
  25. }
复制代码


以上是现对服务端,也就是regionserver端来说的。


3、定义一个客户端类CustomRowCountClient;
  1. public class CustomRowCountClient {
  2.    
  3.     public long rowcount(HTable htable) throws Throwable{
  4.             Map<byte[],Long> results = htable.coprocessorExec(CustomProtocol.class, null, null, new     Batch.Call<CustomProtocol,Long>() {//Long代表CustomProtocol的返回值类型
  5.                 @Override
  6.                 public Long call(CustomProtocol rcp) throws IOException {
  7.                 Scan scan = new Scan();
  8.                 scan.setCaching(500);
  9.                 scan.setCacheBlocks(false);
  10.                 return rcp.rowCount(scan, null);
  11.                 }
  12.      });
  13.             //将regionserver统计的数据进行汇总
  14.             long total = 0;
  15.             for(Map.Entry<byte[],Long> e : results.entrySet()){
  16.             total+=e.getValue().longValue();
  17.             }
  18.             return total;
  19.             }
  20. }
复制代码


4、在hbase集群进行部署,在hbase安装目录conf下,修改hbase-site.xml,添加
  1. <property>
  2.                 <name>hbase.coprocessor.region.classes</name>
  3.                 <value>custom.coprocessor.CustomProtocolIm</value>
  4. </property>
复制代码

如果有多个协处理器使用逗号进行分隔。

5、重启集群,就可以通过访问CustomRowCountClient进行统计了。也可以在shell命令中这样操作

  1. hbase(main):003:0> disable 'test'
  2. hbase(main):003:0>alter 'test', METHOD => 'table_att','coprocessor'=>'|custom.coprocessor.CustomProtocolIm||'
  3. hbase> enable 'test'
复制代码

这种方法只会对指定的表生效。

回复

使用道具 举报

wkzyehui 发表于 2014-3-27 16:09:38
不明觉厉啊
回复

使用道具 举报

wkzyehui 发表于 2014-3-27 16:10:10
好好学习天天向上
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条