分享

HBase 协处理器编程详解第一部分:Server 端代码编写

问题导读

1.编写 Coprocessor 流程和开发环境需要哪些准备?
2.部署协处理器有哪三种方法?







本文详细地介绍了在 HBase0.98.11 版本下编写协处理器的细节,包括环境搭建和代码讲解。所有方法也适用于 HBase 1.0 版本。HBase 社区对于协处理器的文档一直比较缺乏,现有文档仅适用于 0.92 版本,早已过时。希望本文对需要使用新版本 HBase 协处理器的读者有所帮助。


Hbase 协处理器 Coprocessor 简介

HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了 Hadoop MapReduce 仅适于批处理的缺陷,正在被越来越多的用户使用。作为 HBase 的一项重要特性,Coprocessor 在 HBase 0.92 版本中被加入,并广受欢迎。本文假设读者对 HBase 以及 Coprocessor 已经比较熟悉,因此并不打算进详细介绍 HBase Coprocessor 的基本概念。不熟悉 Hbase 协处理器原理的读者可以先阅读 HBase 博客上的文章 coprocessor_introduction进行一个基本的了解。

利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。

另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。

HBase 协处理器用途广泛,然而 HBase 的各种文档却没有关于其编程方法的详细介绍,给入门的新手带来很大的障碍。在前文提及的经典文章 coprocessor_introduction中有关于如何编写 Coprocessor 的一些描述,可惜该文章发表于 HBase 0.92 的流行时期,其编程方法在新版本的 HBase 中已经无法使用。如今广泛使用的是 0.98 版本,甚至更新的 1.0 版本,读者如果按照上文中的方法进行尝试,一定会摸不着头脑。
本文将实现两个具体的 Coprocessor,来分别讲述如何编写 0.98 版本 HBase 的协处理器,基本方法对于 HBase 1.0 版本也适用。




编写 Coprocessor 流程概述和开发环境准备
本文作者采用的操作系统为 CentOS 6.5,采用其他的 Linux 发行版也可以进行 HBase 开发,不过准备工作的细节稍有不同。不过总的说来需要以下这三个主要的工具:
  • JDK 1.6 以上版本
  • Hbase 0.98
  • Google Protobuf 2.5.0
HBase 0.98 可以使用 Java 6 或 Java 7,而到了 1.0 版本就必须使用 Java 7。但是 Java 8 则有一些问题。因此推荐大家直接使用 Java 7 的 JDK 进行开发。


安装 JDK1.7
在 Oracle 官网下载 JDK。
图 1. 下载 JDK

img001.jpg


下载完成后解压,并移动到$HOME/tools 目录下。$HOME/tools 是我自己创建的一个目录,用来存放所有开发本文实例所需要的工具。
[mw_shl_code=bash,true]tar -xzvf jdk-7u67-linux-x64.tar.gz
mv –r jdk1.7.0_67 $HOME/tools[/mw_shl_code]

如果有 root 或者 sudo 的权限,您也可以直接下载 rpm 包,然后用 rpm 命令安装。本文做最一般性的假设,因此假设您没有 root 的权限。
将下载文件解压后,还需要修改环境变量。将下面的 export 语句加入.bashrc 或者.profile 中均可。这样下次登录时这些环境变量将自动生效。

[mw_shl_code=bash,true]export JAVA_HOME=$HOME/tools/jdk1.7.0_67/
export PATH=$JAVA_HOME/bin:$PATH[/mw_shl_code]

安装 Google Protobuf
老版本的 HBase(即 HBase 0.96 之前) 采用 Hadoop RPC 进行进程间通信。在 HBase 0.96 版本中,引入了新的进程间通信机制 protobuf RPC,基于 Google 公司的 protocol buffer 开源软件。HBase 需要使用 Protobuf 2.5.0 版本。这里简单介绍其安装过程:
首先需要下载 Protobuf 2.5.0 版本的源代码安装包,如果无法访问,可以在 csdn 找到下载。
[mw_shl_code=bash,true]wget href="https://protobuf.googlecode.com/files/protobuf-2.5.0.tar.bz2
tar xjvf protobuf-2.5.0.tar.bz2[/mw_shl_code]
确保您已经安装了 gcc 和 gcc-c++包,然后进行编译安装:

[mw_shl_code=bash,true]mkdir $HOME/tools/protobuf-2.5.0
./configure --prefix=$HOME/tools/protobuf-2.5.0
make;make install[/mw_shl_code]

编译成功后编辑环境变量,加入 protoc 的路径
[mw_shl_code=bash,true]export PROTO_HOME=$HOME/tools/protobuf-2.5.0
export PATH=$PROTO_HOME:$PATH[/mw_shl_code]


安装 Maven
本文采用 Maven 进行 Java 工程创建和编译,因此需要安装 Maven。您也可以采用其他您所喜欢的 Java 开发工具。
在 Maven 的官方网站可以下载 Maven 的二进制包,选择版本 3 以上的均可,本文采用最新的 Maven3.3.1。
图 2. 下载 Maven
img002.jpg

下载完毕后解压。将解压后的二进制文件夹移动到$HOME/tools 下,以便于将来清理环境。最后修改环境变量即可。

[mw_shl_code=bash,true]tar -xzvf apache-maven-3.3.1-bin.tar.gz
mv apache-maven-3.3.1 $HOME/tools
vi .bashrc
export MAVEN_HOME=$HOME/tools/apache-maven-3.3.1
export PATH=$MAVEN_HOME/bin:$PATH[/mw_shl_code]

安装 HBase
如果您已经安装了 HBase,并且其版本高于 0.96,那么请略过本节。写作本文时,HBase0.98 的最新版本为 0.98.11,因此这里简单介绍 HBase 0.98.11 版本的安装,本文的示例程序也将在这个版本的 HBase 中部署运行。
本文介绍的协处理器编写方法可以在任何高于 0.96 版本的 HBase 上运行,包括 HBase 1.0。0.94 版本的协处理器开发方法有所不同,在前文提及的经典文章 coprocessor_introduction中有详细介绍,互联网上也有大量的文章讲述老版本的协处理器编写方法。本文不再赘述,而着重讲述变化后的 HBase 协处理器开发方法。
在 HBase 网站下载:

图 3. 下载 HBase

img003.jpg

下载完毕照例解压,修改环境变量。

[mw_shl_code=bash,true]tar xzvf hbase-0.98.11-hadoop2-bin.tar.gz
mv hbase-0.98.11-hadoop2
vi ~/.bashrc
export HBASE_HOME=$HOME/tools/hbase-0.98.11-hadoop2
export PATH=$HBASE_HOME/bin:$PATH[/mw_shl_code]

为了运行本文中的实例,我们仅需要 HBase 运行在 standalone 模式即可。为此,无需修改任何配置,直接启动 HBase 即可。
[mw_shl_code=bash,true]start-hbase.sh[/mw_shl_code]

用 jps 命令查看,应该有一个 HMaster 的进程在运行。如果看到该进程,那么恭喜,您已经建立了一个完整的开发环境,足以满足本文的需要了。
进入细节之前,我们先从整体上了解一下开发 HBase 协处理器的流程。对于 Endpoint 类型的协处理器,其开发流程如下:

第一步是建立一个 Java 工程;第二步是定义用户 ClientHBase 通信的 RFC,采用 Protobuf 语言和工具完成定义;第三步是编写 HBase 协处理器的 Client 端和 Server 端代码,其中,Client 端代码负责调用协处理器并处理返回结果,Server 端代码将运行在 Region Server 上,实现具体的任务;最后需要对编译好的代码进行部署和测试。

对于 Observer 类型的协处理器,不需要定义 RPC,也不需要开发客户端代码。当相应的事件发生时,Observer 代码将自动在 Server 端执行。因此仅仅需要编写 Server 端的代码即可。

一个应用实例
本文将通过一个具体实例,来演示两种协处理器的开发方法的详细实现过程。

在管理 HBase 应用的过程中,笔者常想知道某个 Region 中数据行的个数总和,即 row count,或者整个 table 的数据量。本文将用“行数”来指称 row count。可以用 HBase Shell 的 count 命令来获取某张表的数据量。不过这是一个全表扫描过程,非常浪费资源,也很慢;另外一方面,还没有一个快捷的方法来获得单个 Region 的行数。

为此,我打算利用 Coprocessor 来实现一个简单的工具来帮助我实现以上的需求。其工作原理如下:
利用 Observer 协处理器在每一次 put 操作时,将统计该 Region 的行数,并保存在一个计数器中;在每一次 delete 操作时,将该计数器减 1。利用 Endpoint 协处理器,将该计数器的数值返回给 Client 端调用;为了在 Observer 和 Endpoint 协处理间共享行数计数器,我们将该计数器保存在 ZooKeeper 中。在客户端,调用 Endpoint 协处理器获取指定 Region 的行数计数器,并将所有的返回值求和即可。基本过程如下图所示:
图 4. 整体流程
img004.jpg



创建 maven 工程
创建一个工程代码目录,用 {PROJECT_HOME} 来代表您创建的目录,后续开发的所有代码都将放在这里:
[mw_shl_code=bash,true]$ mkdir $PROJECT_HOME
[/mw_shl_code]

用如下 Maven 命令创建工程:
[mw_shl_code=bash,true]$ cd $PROJECT_HOME
$ mvn archetype:generate -DgroupId=org.ibm.developerworks -DartifactId=regionCount
                                -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false[/mw_shl_code]

Endpoint 协处理器

在本文中,Endpiont 协处理器的工作十分简单。仅仅返回 Region 的行数计数器即可,可以归纳为:读取一个值,然后返回它。但是即便是如此简单的一个操作,为了实现它,必须首先编写协处理器的框架。本文试图为大家提供一个尽量完整的参考。对于有经验的 Java 开发人员,以下的描述恐怕略显啰嗦,还请见谅。

用 Protobuf 编写和定义 RPC

如前所述 Endpoint 协处理器读取 Region 的行数计数器,然后将该值返回给调用的客户端。因此 RPC 需要一个整数类型的返回值代表行数。仅仅返回行数的情况下,客户端并不需要为 RPC 定义任何输入参数,不过为了演示输入和输出,我们额外为这个 RPC 设计了一个输入参数:reCount。这个参数是一个布尔变量,当为 true 时,表明用户需要 Endpoint 扫描遍历 Region 来计算行数;当其为 false,表示直接使用 Observer 协处理器维护的计数器。前者需要扫描整个 Region,非常慢;后者效率很高。
通过这种方法,我们也可以检验数据的正确性,因为遍历 Region 得到的行数是最准确的。最终的 RPC 定义如下。


清单 1. getRowCount RPC proto 定义

[mw_shl_code=bash,true]option java_package = "org.ibm.developerworks";

option java_outer_classname = "getRowCount";
option java_generic_services = true;
option optimize_for = SPEED;

message getRowCountRequest{
required bool reCount = 1;
}

message getRowCountResponse {
optional int64 rowCount = 1;
}


service ibmDeveloperWorksService {
rpc getRowCount(getRowCountRequest)
returns(getRowCountResponse);
}[/mw_shl_code]

将以上代码保存为文件 ibmDeveloperworksDemo.proto。可以看到,这里定义了一个 RPC,名字叫做 getRowCount。该 RPC 有一个入口参数,用消息 getRowCountRequest 表示;RPC 的返回值用消息 getRowCountResponse 表示。Service 是一个抽象概念,RPC 的 Server 端可以看作一个 Service,提供某种服务。在 HBase 协处理器中,Service 就是 Server 端需要提供的 Endpoint 协处理器服务,可以为 HBase 的客户端提供服务。在一个 Service 中可以提供多个 RPC,在本文中,我们仅仅定义了一个 RPC,实际工作中往往需要定义多个。
将该文件存放在工程的 src/main/protobuf 目录下。

[mw_shl_code=bash,true]$ mkdir $PROJECT_HOME/rowCount/src/main/protobuf
$ mv ibmDeveloperworksDemo.proto $PROJECT_HOME/rowCount/src/main/protobuf[/mw_shl_code]
用 Protobuf 编译器将该 proto 定义文件编译为 Java 代码,并放到 Maven 工程下。
[mw_shl_code=bash,true]$ cd $PROJECT_HOME/rowCount/src/main/protobuf
$ protoc --java_out=$PROJECT_HOME/rowCount/src/main/java ibmDeveloperworksDemo.proto[/mw_shl_code]

现在可以看到在工程的 src/main/java/org/ibm/developerworks 目录下生成了一个名为 getRowCount.java 的文件。这个 Java 文件就是 RPC 的 Java 代码,在后续的 Server 端代码和 Client 端代码中都要用到这个 Java 文件。
为了编译新生成的 Protobuf Java 代码,我们还需要修改 Maven 的 pom.xml 文件,加入对 protobuf-2.5.0 的依赖,这样 Maven 就可以自动下载相应的 jar 包,完成编译。
在 pom.xml 文件中加入如下的内容即可:
清单 2. Protobuf 在 pom.xml 中的依赖

[mw_shl_code=bash,true]<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>[/mw_shl_code]
现在可以尝试进行第一编译了:
[mw_shl_code=bash,true]mvn clean compile
[/mw_shl_code]


如果出现错误,您需要仔细查看代码是否在编辑的时候出错。本文的附件中有所有的示例代码,仅供参考。

实现 Server 端代码
在工程目录的 src/main/java/org/ibm/developerworks/下建立一个 coprocessor 的目录,存放我们即将开发的 Server 端 Endpoint 协处理器代码。

[mw_shl_code=bash,true]$ mkdir $PROJECT_HOME/rowCount/src/main/java/org/ibm/developerworks/coprocessor
[/mw_shl_code]


Server 端的代码就是一个 Protobuf RPC 的 Service 实现,即通过 Protobuf 提供的某种服务。其开发内容主要包括:
  • 实现 Coprocessor 的基本框架代码
  • 实现服务的 RPC 具体代码




Endpoint 协处理的基本框架
Endpoint 是一个 Server 端 Service 的具体实现。它的实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和 HBase 的运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。
首先 Endpoint 协处理器是一个 Protobuf Service 的实现,因此需要它必须继承某个 Protobuf Service。我们在前面已经通过 proto 文件定义了 Service,命名为“ibmDeveloperWorksService”,因此 Server 端代码需要重载该类,下图中相关代码用红色方框着重显示:
图 5. Endpoint 协处理器框架代码--父类 img005.jpg
其次,作为一个 HBase 的协处理器,Endpoint 还必须实现 HBase 定义的协处理器协议,用 Java 的接口来定义。具体来说就是CoprocessorServiceCoprocessor,这些 HBase 接口负责将协处理器和 HBase 的 RegionServer 等实例联系起来,以便协同工作。下图中相关代码用红色方框着重显示:
图 6. Endpoint 协处理器框架代码--接口 img006.jpg
Coprocessor 接口定义了两个接口函数,start 和 stop。
协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。


清单 3.start 接口

[mw_shl_code=bash,true]//这两个类成员是后续代码用来操作 ZooKeeper 的,在 start() 中进行初始化
private String zNodePath = "/hbase/ibmdeveloperworks/demo";
private ZooKeeperWatcher zkw = null;

@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.re = (RegionCoprocessorEnvironment) env;
RegionServerServices rss = re.getRegionServerServices();
//获取 ZooKeeper 对象,这个 ZooKeeper 就是本 HBase 实例所连接的 ZooKeeper
zkw = rss.getZooKeeper();
//用 region name 作为 znode 的节点名后缀
zNodePath=zNodePath+re.getRegion().getRegionNameAsString();
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}[/mw_shl_code]

CoprocessorEnviorment 保存了协处理器的运行环境,每个协处理器都是在一个 RegionServer 进程内运行,并隶属于某个 Region。通过该变量,可以获取 Region 的实例等 HBase 的运行时环境对象。

当需要在协处理器内调用 HBase 的服务或者 API 时,就必须通过该变量获取相应的 HBase 内部对象实例完成相关的操作。我们将在后续的 RPC 实现代码中给出详细的例子。

在 start 函数中,我们还初始化了一个 ZooKeeperWatcher,在文章的后续部分中,我们将详细介绍这个对象的用途。
Coprocessor 接口还定义了 stop() 接口函数。该函数在 Region 被关闭时调用,用来进行协处理器的清理工作。在本文中,我们没有任何清理工作,因此该函数什么也不干。

清单 4. stop 接口

[mw_shl_code=bash,true] @Override
public void stop(CoprocessorEnvironment env) throws IOException {
// nothing to do
}[/mw_shl_code]

我们的协处理器还需要实现 CoprocessorService 接口。该接口仅仅定义了一个接口函数 getService()。我们仅需要将本实例返回即可。HBase 的 RegionServer 在接受到客户端的调用请求时,将调用该接口获取实现了 RPC Service 的实例,因此本函数一般情况下就是返回自身实例即可。

清单 5. getService 接口

[mw_shl_code=bash,true] /**
* Just returns a reference to this object, which implements the RowCounterService interface.
*/
@Override
public Service getService() {
return this;
}[/mw_shl_code]

完成了以上三个接口函数之后,Endpoint 的框架代码就完成了。每个 Endpoint 协处理器都必须实现这些框架代码,而且写法雷同。
Endpoint 协处理器真正的业务代码都在每一个 RPC 函数的具体实现中。
在本文中,我们的 Endpoint 协处理器仅提供一个 RPC 函数,即 getRowCount。我将分别介绍编写该函数的几个主要工作:了解函数的定义,参数列表;处理入口参数;实现业务逻辑;设置返回参数。

函数定义
函数 getRowCount 在 Server 端的函数定义如下。
[mw_shl_code=bash,true]public void getRowCount(RpcController controller, getRowCount.getRowCountRequest request,
RpcCallback<getRowCount.getRowCountResponse> done)[/mw_shl_code]

每一个 RPC 函数的参数列表都是固定的,有三个参数。第一个参数 RpcController 是固定的,所有 RPC 的第一个参数都是它,这是 HBase 的 Protobuf RPC 协议定义的;第二个参数为 RPC 的入口参数;第三个参数为返回参数。入口和返回参数分别由代码清单 1 的 proto 文件中的 getRowCountRequest 和 getRowCountResponse 定义。

分析入口参数
request 包含了入口参数,从 proto 定义中可以知道,这个入口参数只有一个 field,布尔类型的 reCount。我们将该参数从 Protobuf 消息中反序列化:

[mw_shl_code=bash,true]boolean reCount=request.getReCount();
[/mw_shl_code]

如果您编写的 RPC 包含多个 field,每一个 field 都可以通过 request.getXXX() 函数来获得,其中 XXX 表示 field 的名字。

实现函数逻辑
我们的 RPC 的主要业务逻辑为获得 Region 的行数,当 reCount 为 true 时,需要遍历 Region 然后对结果集进行计数来获得行数;当 reCount 为 false 时,直接读取表示行数的变量。

我们先来看遍历 Region 的方法,这是最经典的行数统计实现,在 HBase 代码的 example 目录下也有现成的例子。通过 Scan 操作遍历 Region 中的每一行数据,在循环内将计数器累加即可。下面的代码清单中将错误处理部分去掉,以便很好地显示主要逻辑。

清单 6. 采用 scan 方法获取行数

[mw_shl_code=bash,true]long getTableRowCountBatch(String tableName) {
try{
//连接 Hbase
Configuration config = new Configuration();
HConnection connection = HConnectionManager.createConnection(config);
HTableInterface table = connection.getTable(tableName);


//设置 request 参数
org.ibm.developerworks.getRowCount.getRowCountRequest.Builder builder =
getRowCountRequest.newBuilder();
builder.setReCount(false);

//开始和结束 rowkey
byte[] s= Bytes.toBytes("r1");
byte[] e= Bytes.toBytes("t1");
//调用 batchCoprocessorService
results = table.batchCoprocessorService(
ibmDeveloperWorksService.getDescriptor().findMethodByName("getRowCount"),
builder.build(),s, e,
getRowCountResponse.getDefaultInstance());
}
Collection<getRowCountResponse> resultsc = results.values();
for( getRowCountResponse r : resultsc)
{
totalRowCount += r.getRowCount();
}

return totalRowCount;
}[/mw_shl_code]

我们在前文定义了 Protobuf,其中 ibmDeveloperWOrksService 是我们定义的 Service,通过其 getDescriptor() 方法可以找到 Service 的描述符。RPC 方法在 Service 中定义,因此可以用 Service 描述符的 findMethodByName 方法找到具体的方法描述符,该描述符作为 batchCoprocessorService 的第一个参数,以便该方法可以知道调用哪个 RPC。关于 HBase 如何使用 Protobuf 实现 RPC 是一个大的话题,本文无法展开说明,感兴趣的读者可以进一步自行研究。

接下来需要给出 RPC 的入口参数和返回参数类型。和前面直接使用 coprocessorService 一样,还需要指定开始和结束的 rowkey,以便该方法找到正确的 Region 列表。直到 1.0 版本,HBase 在这里还有一些 bug,即 startkey 和 endkey 不能指定为 null。虽然 javadoc 中指明如果为 null,表示全表。但笔者在 0.98.11 版本中无法使用 null 参数,会执行出错。读者可以关注 HBASE-13417,看看什么时候可以正常使用 null 作为参数。
如上所述,一般情况下,使用 Endpoint 协处理器的频率不会太高。HBase 是一个存储数据的系统,最常用的应该是 get 和 put,如果频繁使用协处理器,也许说明您应该考虑其他的数据库系统。因此实践中,笔者尚未曾见过调用 batchCorpcoessorService 的例子。也可以理解为什么这个方法还存在如此低级的 bug。所以笔者不推荐读者使用该方法。

至此主要的客户端代码都已经实现,接下来我们需要编译并部署和执行。
我们将客户端代码也加在同一个 Maven 工程中,最终生成一个 jar 文件,包括了测试程序和协处理器代码。您也可以为客户端代码另外创建一个 Maven 工程。读者随意。

在本文中,最终的 jar 包为 rowCount.jar。


部署和运行

部署协处理器有三种方法。在 HBase 博客上的文章 coprocessor_introduction 中介绍了其中两种,即修改 hbase-site.xml 文件和通过HBase Shell命令的方法。本文就不再重复。
第三种方法是通过 HBase API,用编程的方法为指定 table 部署协处理器。相应的 API 为 HTableDescriptor.addCoprocessor(String className)。本文将介绍这种方法。
代码清单 7 演示了如何创建测试用表的过程,通过调用 HTableDescriptor 的 addCoprocessor 方法为相应的 HBase 表部署了我们前文开发的两个协处理器。这段代码作为客户端代码的初始化部分,读者可以下载本文的附件获得完整代码。
清单 7. 利用 Java 代码实现协处理器部署
[mw_shl_code=bash,true]boolean createTable(string tableName) {
//HBase 1.0 创建 Table
Configuration config = new Configuration();
Table table = null;
TableName TABLE = TableName.valueOf(tableName);
Admin admin= new Admin(config);
HTableDescriptor tableDesc = new HTableDescriptor(TABLE);

//HBase 0.98 创建 Table
Configuration config = new Configuration();
HBaseAdmin admin = new HBaseAdmin(config);
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
//添加 coprocessor
tableDesc.addCoprocessor(“org.ibm.developerworks.coprocessor.getRowCountEndpoint”);
tableDesc.addCoprocessor(“org.ibm.developerworks.coprocessor.rowCountObserver”);

//省去其他的 HTableDescriptor 操作代码
...
//创建表
admin.createTable(tableDesc);

}[/mw_shl_code]

采用 Java 编程的方法部署协处理器,不需要对已经运行的 HBase 实例做任何修改,但是为了 HBase 能够加载协处理器的 jar 包,我们必须将其拷贝到 HBase 实例的 CLASSPATH 所指定的目录下。由于本文使用的是 HBase 的 standalone 模式,最简单的方法是将 jar 包放到$HBASE_HOME/lib 目录下。在真实的分布式环境下,需要将 jar 包拷贝到每台集群节点的$HBASE_HOME/lib 中,也有些公司将 jar 包上传到 HDFS 的指定目录,这样每台 RegionServer 都可以通过 HDFS 读取。具体的协处理器部署方法在 HBase0.96 之后和之前的版本都没有区别,读者可以参考前文提及的经典协处理器文章进一步了解。

本文的测试程序用法
本文附件中提供了一个完整的测试程序,用来演示本文中所有的代码功能。该测试程序创建用户输入的表,并自动采用 API 方法部署协处理器。然后,测试程序将首先向测试表插入 1000 条数据。然后根据用户输入,执行 N 条 delete 命令,保证最终表内的行数为用户指定的数字。
准备好数据之后,测试代码将分别使用本文介绍的三种客户端方法调用协处理器,并分析和输出结果。
在命令行直接运行例子程序:

[mw_shl_code=bash,true]java hbaseCoprocessorDemo test 1 666 rowkey
[/mw_shl_code]

测试程序执行,创建 HBase 表”test”,第二个参数 1 的意思是采用 slow 方法获取 rowcount;如果第二个参数为 0,则例子代码会采用快速方法获取 rowcount;第三个参数即最终期望的 rowcount 个数;第四个参数表示是否进行全表扫描,如果用户将该参数省略,则进行全表 rowcount 统计,否则该参数表示 rowkey,程序就会去统计该 rowkey 所在 Region 上的 rowcount。

关于例子程序的运行细节本文就不再详述,读者可以下载附件查看更多细节,例子程序非常粗糙,仅为了演示基本的 Coprocessor 编程技术。读者可以自行优化和修改。

为了读者实验方便,附件代码包还提供了两个脚本:deploy.sh 进行代码编译和部署;test.sh 进行程序调用。读者可以直接使用这两个脚本进行测试。



调试
至此本系列告一段落,通过详细介绍开发协处理器的各种细节,希望能够对大家有所帮助。不过每个程序员都知道,多数情况下,编写代码本身并不会花费太多精力,最耗费时间的就是调试。然而对协处理器代码的调试是比较困难的。笔者也没有好的经验推荐给大家,唯一的调试手段就是通过日志打印,然后分析日志信息。

HBase 本身采用 log4j 进行日志打印,因此可以在 Coprocessor 中使用同样的日志方法,将信息打印到 HBase 的 Region Server 日志文件中,然后通过查看日志文件进而了解协处理器的运行情况。

使用 log4j 的方法非常简单,在类定义中加入 LOG 对象,并初始化。

[mw_shl_code=bash,true]private static final Log LOG = LogFactory.getLog(RowCountObserver.class);
[/mw_shl_code]

此后,就可以在需要的地方进行打印了,即调用 LOG.info(“”) 即可。非常简单,读者可以参考附件中的代码,在此就不再赘述了。


结束语
HBase 的协处理器用途广泛,但是 HBase 的文档中对协处理器编程的细节却缺乏实用性的编程方法描述,希望本文能够为广大 HBase 用户提供一个较为详细的入门介绍。由于作者水平有限,文中可能有错误和不妥的地方,还希望读者不吝赐教。





已有(3)人评论

跳转到指定楼层
qiuyusir 发表于 2016-2-5 11:11:06
有完整代码吗,想学习一下
回复

使用道具 举报

duojiu 发表于 2016-10-19 10:51:38
为什么在集群上运行endpoint的 rowcount 出来的数据和直接在hbase shell count出来的数据是不一致,endpoint 运行一次 rowcount数都不一样。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条