分享

HBase 利用Coprocessor实现聚合函数

问题导读:
1、HBase默认不支持聚合函数,那我们该用什么来实现呢 ?
2、怎么用编程的方式去实现呢 ?





HBase默认不支持聚合函数(sum,avg等)。可利用HBase的coprocessor特性实现。这样做的好处是利用regionserver在服务端进行运算。效率高,避免客户端取回大量数据,占用网络带宽,消耗大量内存等。

实现方式:

利用HBase提供的endPoint类型的AggregateImplementation Coprocess,配合AggregationClient访问客户端实现RegionServer端的集合计算。AggregationClient访问代码如下:

  1. aggregationClient.avg(Bytes. toBytes("TableName"), ci, scan);
复制代码

scan即为要计算列的查询条件。这里有一个ColumnInterperter类型的参数ci。即列解释器,用于解析列中的值。HBase默认提供了LongColumnInterpreter。而我要处理的值是double类型的,所以先实现了一个DoubleColumnInterpreter。(从JIRA上看Doulbe类型的解释器好像正在开发中)。ColumnInterpreter接口的实现会在AggregateImplementation
  1. /**
  2. * Double类型的列解释器实现
  3. *
  4. * @author OneCoder
  5. */
  6. public class DoubleColumnInterpreter implements
  7.            ColumnInterpreter<Double, Double> {
  8.      @Override
  9.      public void write(DataOutput out) throws IOException {
  10.      }
  11.      @Override
  12.      public void readFields(DataInput in) throws IOException {
  13.      }
  14.      @Override
  15.      public Double getValue( byte[] colFamily, byte[] colQualifier, KeyValue kv)
  16.                  throws IOException {
  17.             if (kv == null)
  18.                  return null;
  19.             // 临时解决方案,如果采用Bytes.toDouble(kv.getValue())会报错,偏移量大于总长度。
  20.             // toDouble(getBuffer(), getValueOffset),偏移量也不对。
  21.             return Double. valueOf(new String(kv.getValue()));
  22.      }
  23.      @Override
  24.      public Double add(Double l1, Double l2) {
  25.             if (l1 == null ^ l2 == null) {
  26.                  return l1 == null ? l2 : l1;
  27.            } else if (l1 == null) {
  28.                  return null;
  29.            }
  30.             return l1 + l2;
  31.      }
  32.      @Override
  33.      public Double getMaxValue() {
  34.             // TODO Auto-generated method stub
  35.             return null;
  36.      }
  37.      @Override
  38.      public Double getMinValue() {
  39.             // TODO Auto-generated method stub
  40.             return null;
  41.      }
  42.      @Override
  43.      public Double multiply(Double o1, Double o2) {
  44.             if (o1 == null ^ o2 == null) {
  45.                  return o1 == null ? o2 : o1;
  46.            } else if (o1 == null) {
  47.                  return null;
  48.            }
  49.             return o1 * o2;
  50.      }
  51.      @Override
  52.      public Double increment(Double o) {
  53.             // TODO Auto-generated method stub
  54.             return null;
  55.      }
  56.      @Override
  57.      public Double castToReturnType(Double o) {
  58.             return o.doubleValue();
  59.      }
  60.      @Override
  61.      public int compare(Double l1, Double l2) {
  62.             if (l1 == null ^ l2 == null) {
  63.                  return l1 == null ? -1 : 1; // either of one is null.
  64.            } else if (l1 == null)
  65.                  return 0; // both are null
  66.             return l1.compareTo(l2); // natural ordering.
  67.      }
  68.      @Override
  69.      public double divideForAvg(Double o, Long l) {
  70.             return (o == null || l == null) ? Double. NaN : (o.doubleValue() / l
  71.                      .doubleValue());
  72.      }
  73. }
复制代码

导出jar包上传到HBase Region节点的lib下。然后配置RegionServer的Coprocessor。在服务端hbase-site.xml中,增加
  1. <property>
  2.             <name >hbase.coprocessor.region.classes </name >
  3.            <value >org.apache.hadoop.hbase.coprocessor.AggregateImplementation </value >
  4. </property >   
复制代码

最后,我们重启服务,使配置和jar生效。然后调用AggregationClient中提供的avg, max等聚合函数,即可在region端计算出结果,返回。


最后,感谢原作者的分享:本文出自




已有(8)人评论

跳转到指定楼层
linguobao 发表于 2014-7-1 17:19:44
楼主,请教下:
AggregationClient的rowCount所调用的服务器端AggregateImplementation的getRowNum方法,和RowCountEndpoint的getRowCount方法,大方向感觉是一样的,但是,RowCountEndpoint采用了protobuf。这有什么区别???
回复

使用道具 举报

ohano_javaee 发表于 2014-10-19 00:44:10
请问AggregationClient是自己写的类吗如果不是在哪个包下?
回复

使用道具 举报

howtodown 发表于 2014-10-19 01:05:54
ohano_javaee 发表于 2014-10-19 00:44
请问AggregationClient是自己写的类吗如果不是在哪个包下?

hbase 自带的AggregationClient只能对单一列族的单一列进行聚合。
回复

使用道具 举报

ohano_javaee 发表于 2014-10-19 13:17:38
我试了一下,似乎不行。
org.apache.hadoop.hbase.coprocessor.ColumnInterpreter是一个抽象类,不是接口。
AggregationClient这个类我也没找到。我用的是hbase-0.96.2-hadoop2。不知道是不是版本的原因。
回复

使用道具 举报

wubaozhou 发表于 2015-1-1 22:36:49
回复

使用道具 举报

Rommy.Yang 发表于 2016-7-25 20:03:30
如果想实现group后sum怎么做?
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条