分享

HBase-1.0.1.1 编写Coprocessor【分享】

bob007 发表于 2016-1-24 18:43:59 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 13800
本帖最后由 bob007 于 2016-1-24 18:45 编辑

这几天为了满足业务需求,自己写了个Coprocessor,这里写一篇博客记录一下。

使用Coprocessor的目的是这样的,假如你的业务使你不得不进行全表查询,如果使用传统的Scan的话,那么全表查询将会给集群带来高的带宽压力,而且可能Client端也负载不了海量数据的计算。HBase提供了AggregateImplementation,可以进行简单的例如计算sum、average等操作,但是这些操作都是只针对一个列进行的,我遇到的情况是需要同时统计多个列,因此,就只能自己写Coprocessor了。

HBase的Coprocessor是运行在RegionServer上的一段程序,有点类似于关系型数据库中的触发器和存储过程。这个Coprocessor通常有两种类型,分别是Observer和Endpoint。Oberserver相当于触发器,可用于建立二级索引等操作。Endpoint相当于存储过程,可用于在各个RegionServer上做一些计算等,然后将计算的结果汇集到Client端来做最后的处理,这有点儿像Map/Reduce的过程。我这几天是将Coprocessor用作Endpoint,所以这里只对编写Endpoint的代码进行介绍。

在HBase版本0.96以后,HBase的RPC框架采用的就是protobuf作为通讯协议,关于protobuf的介绍网上有很多,这里就不再介绍了。


1.首先需要下载protobuf的解析器protobuf-2.5.0.tar.gz,然后,按照如下的方式安装解析器。
[mw_shl_code=bash,true]tar -zxvf protobuf-2.5.0.tar.gz
./configure --prefix=/home/cyber_space/protobuf
make
make install[/mw_shl_code]


2.编写.proto文件

这个.proto文件我是仿照hbase源代码中的两个Example写的,这两个Example的源代码路径在如下路径中,


[mw_shl_code=bash,true]/hbase-1.0.1.1_src/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
/hbase-1.0.1.1_src/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java[/mw_shl_code]
它们的.proto文件的路径在如下的路径中,
[mw_shl_code=bash,true]~/hbase-1.0.1.1_src/hbase-examples/src/main/protobuf/Examples.proto
~/hbase-1.0.1.1_src/hbase-examples/src/main/protobuf/BulkDelete.proto [/mw_shl_code]
简单解释一下,message是消息关键字,这里我定义了两个消息,用于Coprocessor交流,CountRequest是RPC调用端发出的请求消息,CountResponse是服务端返回的消息,然后又定义了一个Service,这个service里可以定义一个远程调用的方法,getsizes,然后调用的参数就是CountRequest消息,返回是CountResponse消息。[default=0]表示的是这些变量默认为0。每个变量后的=1、=2表示的是变量的优先级。

[mw_shl_code=bash,true]option java_package = "com.uestc.coprocessor";
option java_outer_classname = "JAggregateProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
        required string columns = 1;
}

message CountResponse {
        required int64 qiniu_count = 1 [default = 0];
        required int64 qiniu2_count = 2 [default = 0];
        required int64 jinshan_count = 3 [default = 0];
        optional int64 other_cloud = 4 [default=0];
}

service RowCountService {
  rpc getSizes(CountRequest)
    returns (CountResponse);
}[/mw_shl_code]


写完这个消息后,使用我们刚才安装的protoc工具将其编译生成我们需要的Java代码,使用的命令如下:
[mw_shl_code=bash,true]protoc --java_out=/home/cyber_space/ MultiColumnAggregationService.proto[/mw_shl_code]

这样就会生成我们在.proto文件中指定的JAVA类,类中包含了必要的Service和getter、setter方法等。



3.接下来要写一个类,这个类将在RegionServer端运行,然后被客户端通过RPC框架调用

这个实现类需要继承我们用protoc工具生成的Service类,实现Coprocessor、CoprocessorService接口,以便给HBase的RPC框架调用。start方法我是照着Example写的,这个方法会在HBase启动时,或者是Coprocessor启动时被调用。stop方法可以暂时不实现内容。getService方法返回这个类的对象就可了。getSizes方法就是刚才写在.proto文件的service中的RPC方法。下面详细讲一下getSizes方法。

getSizes方法中可通过request的getter方法获取客户端的消息,这里我就使用了request.getColumns() 获取到CountRequest中定义的columns字符串变量。在getSizes方法中实例化了一个Scan,并设置了若干Filter,来过滤掉不需要的列。通过scanner = env.getRegion().getScanner(scan)方法来实例化一个InternalScanner,这个Scanner比较接近表信息在HBase底层的存储结构。它的next方法返回的是一行的所有Cell,每个Cell是有Key-ColumnFamilly-Qualifier-value组成的,可以用CellUtil类来对Cell做一些常用操作。最后Response对象,可以用构建器模式进行实例化,在RPCCallback中放入done即可。

[mw_shl_code=bash,true]package com.uestc.coprocessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.uestc.coprocessor.JAggregateProtocol.CountRequest;
import com.uestc.coprocessor.JAggregateProtocol.CountResponse;

public class MultiColumnAggregation extends JAggregateProtocol.RowCountService
                implements Coprocessor, CoprocessorService {
        public static final String COLUMNNAME_QINIU = "Qiniu";
        public static final String COLUMNNAME_JINSHAN = "Jinshan";
        public static final String COLUMNNAME_QINIU2 = "Qiniu2";
        private RegionCoprocessorEnvironment env;

        @Override
        public void start(CoprocessorEnvironment arg0) throws IOException {
                if (arg0 instanceof RegionCoprocessorEnvironment) {
                        this.env = (RegionCoprocessorEnvironment) arg0;
                } else {
                        throw new CoprocessorException("Must be loaded on a table region!");
                }
        }

        @Override
        public void stop(CoprocessorEnvironment arg0) throws IOException {

        }

        @Override
        public Service getService() {
                return this;
        }

        @Override
        public void getSizes(RpcController controller, CountRequest request, RpcCallback<CountResponse> done) {
                long[] values = { 0, 0, 0, 0 };
                String columns = request.getColumns();
                if (columns == null || "".equals(columns))
                        throw new NullPointerException("you need specify the columns");
                String[] columnArray = columns.split(";");

                // 设置filter,只过滤出我们需要的列
                Filter filter = null;
                ArrayList<Filter> filters = new ArrayList<>();
                for (String column : columnArray) {
                        filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("location"),
                                        CompareFilter.CompareOp.EQUAL, Bytes.toBytes(column));
                        filters.add(filter);
                }
                filter = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
                Scan scan = new Scan();
                scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("location"));
                scan.addColumn(Bytes.toBytes("info"), Bytes.toBytes("chunksize"));
                scan.setFilter(filter);

                JAggregateProtocol.CountResponse response = null;
                InternalScanner scanner = null;
                try {
                        scanner = env.getRegion().getScanner(scan);
                        List<Cell> results = new ArrayList<Cell>();
                        boolean hasMore = false;
                        do {
                                hasMore = scanner.next(results);
                                if (results.size() < 2)
                                        continue;
                                // chunksize 在前,location在后
                                Cell kv0 = results.get(0);
                                long chunksize = Long.parseLong(Bytes.toString(CellUtil.cloneValue(kv0)));
                                Cell kv1 = results.get(1);
                                String location = Bytes.toString(CellUtil.cloneValue(kv1));
                                switch (location) {
                                case COLUMNNAME_QINIU:
                                        values[0] += chunksize;
                                        break;
                                case COLUMNNAME_QINIU2:
                                        values[1] += chunksize;
                                        break;
                                case COLUMNNAME_JINSHAN:
                                        values[2] += chunksize;
                                        break;
                                default:
                                        break;
                                }
                                results.clear();
                        } while (hasMore);

                        // 生成response
                        response = JAggregateProtocol.CountResponse.newBuilder().setQiniuCount(values[0]).setQiniu2Count(values[1])
                                        .setJinshanCount(values[2]).build();

                } catch (IOException e) {
                        e.printStackTrace();
                        ResponseConverter.setControllerException(controller, e);
                } finally {
                        if (scanner != null) {
                                try {
                                        scanner.close();
                                } catch (IOException ignored) {
                                }
                        }
                }
                done.run(response);
        }

}
[/mw_shl_code]


4.再编写一个客户端类来调用我们刚才写的代码。

客户端代码通过HBase的RPC框架调用HRegionServer上的代码,HRegionServer上的代码完成工作后将结果返回给客户端,客户端汇集各个RegionServer上的操作结果,做最后的处理,如计算各个RegionServer返回结果的和等。我这里的代码如下,仅做一个示例,有很多不完善的地方:

代码中声明了一个RPCCallback,通过RPCCallBack的get方法,就可以得到在RegionServer执行调用获得的结果,各个调用的结果会返回到Map容器中,然后通过对Map容器的迭代,客户端即可获得各个RegionServer上的执行结果,从而进行自定义的计算。

[mw_shl_code=bash,true]package com.uestc.coprocessor;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;

import com.google.protobuf.ServiceException;
import com.uestc.util.JsonUtil;
import com.uestc.util.Log;

public class MultiColumnAggregateClient {

        public static class CountInfo {
                public long qiniuCount;
                public long qiniu2Count;
                public long jinshanCount;
                public long otherCount;

                @Override
                public String toString() {
                        String rString = String.format("qiniuCount:%d;Qiniu2Count:%d;JinshanCount:%d;OtherCount:%d",
                                        this.qiniuCount, this.qiniu2Count, this.jinshanCount, this.otherCount);
                        return rString;
                }

                /**
                 * 生成jsonString
                 *
                 * @return
                 * @throws IllegalArgumentException
                 * @throws IllegalAccessException
                 */
                public String toJsonString() throws IllegalArgumentException, IllegalAccessException {
                        Field[] fields = this.getClass().getFields();
                        Map<String, Object> map = new LinkedHashMap<>();
                        for (Field field : fields) {
                                map.put(field.getName(), field.get(this));
                        }
                        return JsonUtil.createJsonString(map);
                }
        }

        public CountInfo getSeveralCounts(Table table, String columnNames) throws ServiceException, Throwable {
                final JAggregateProtocol.CountRequest request = JAggregateProtocol.CountRequest.newBuilder()
                                .setColumns(columnNames).build();
                Map<byte[], CountInfo> map = table.coprocessorService(JAggregateProtocol.RowCountService.class, null, null,
                                new Batch.Call<JAggregateProtocol.RowCountService, CountInfo>() {

                                        @Override
                                        public CountInfo call(JAggregateProtocol.RowCountService aggregate) throws IOException {
                                                BlockingRpcCallback<JAggregateProtocol.CountResponse> rpcCallback = new BlockingRpcCallback<>();
                                                aggregate.getSizes(null, request, rpcCallback);
                                                JAggregateProtocol.CountResponse response = rpcCallback.get();
                                                CountInfo countInfo = new CountInfo();
                                                countInfo.qiniuCount = response.getQiniuCount();
                                                countInfo.qiniu2Count = response.getQiniu2Count();
                                                countInfo.jinshanCount = response.getJinshanCount();
                                                countInfo.otherCount = response.getOtherCloud();
                                                return countInfo;
                                        }
                                });
                CountInfo result = new CountInfo();
                for (CountInfo countInfo : map.values()) {
                        result.qiniuCount += countInfo.qiniuCount;
                        result.qiniu2Count += countInfo.qiniu2Count;
                        result.jinshanCount += countInfo.jinshanCount;
                        result.otherCount += countInfo.otherCount;
                }
                Log.logger.info(result);
                return result;
        }
}
[/mw_shl_code]


5.部署我们的Coprocessor

这里分成两步

(1)、可用Eclipse把MultiColumnAggregation类文件(即需要在RegionServer上运行的类)打成jar包,然后把它分发到各个HBase节点上,并把它们放到HBase目录的lib子目录下,这是最简单的方法;或者也可以去配置CLASSPATH、HBASE_CLASSPATH之类的,保证HBase能访问到我们的jar文件。

(2)、编辑HBASE_HOME/conf/hbase-site.xml文件,配置Coprocessor类

若有多个Coprocessor,则用逗号分隔,value标签中的第一个Coprocessor是HBase自带的,第二个是我们自己自定义的。



[mw_shl_code=bash,true]<property>
  <name>hbase.coprocessor.region.classes</name>
  <value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation,com.uestc.coprocessor.MultiColumnAggregation</value>
</property>[/mw_shl_code]

6.编写调用代码

可以这样来调用刚才我们自己写的Coprocessor:

这里的MTable是我自己封装的Table类,它的getTable方法即返回Table类的实例。


[mw_shl_code=bash,true]public MultiColumnAggregateClient.CountInfo sumUpSizes() throws IOException {
                MultiColumnAggregateClient client = new MultiColumnAggregateClient();
                MultiColumnAggregateClient.CountInfo info = null;
                MTable mTable = null;
                try {
                        mTable = new MTable("file");
                        info = client.getSeveralCounts(mTable.getTable(), "Qiniu;Qiniu2;Jinshan");
                } catch (Throwable e) {
                        Log.logException(e);
                } finally {
                        if (mTable != null)
                                mTable.close();
                }
                return info;
        }[/mw_shl_code]

以上的代码因为项目上赶时间临时写的,有很多不完善的地方,但是也完成了需求,通过这个自定义的Coprocessor,就可以一次调用同时统计多个列的的和了,极大的提高了计算的效率。




已有(1)人评论

跳转到指定楼层
Rommy.Yang 发表于 2016-7-20 18:02:59
file的表结构如果能放上来就更好了.
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条