分享

Hadoop二次开发必备,Hadoop源码分析(六)

本帖最后由 pig2 于 2014-1-16 00:28 编辑

DataXceiver支持的的6条操作,我们已经分析完最重要的两条。剩下的分别是:
OP_READ_METADATA (82):读数据块元文件
OP_REPLACE_BLOCK (83):替换一个数据块
OP_COPY_BLOCK (84):拷贝一个数据块
OP_BLOCK_CHECKSUM (85):读数据块检验码
我们逐个讨论。
读数据块元文件的请求如图(操作码82):
1.PNG


应答很简单,应答码(如OP_STATUS_SUCCESS),文件长度(int),数据。拷贝数据块和替换数据块是一对相对应操作。替换数据块的请求如图(操作码83)。这个比起上面的读数据块元文件请求,有点复杂。替换一个数据块是系统平衡操作的一部分,用于接收一个数据块。它和普通的数据块写的差别是,它只发生在两个节点上,一个写,一个读,而不需要建立数据链。我们可以比较一下它们在创建BlockReceiver对象时的差别:  Java代码
  1. blockReceiver = new BlockReceiver(block, proxyReply,   
  2. proxySock.getRemoteSocketAddress().toString(),   
  3. proxySock.getLocalSocketAddress().toString(),
复制代码
首先,proxyReplyin不一样,这是因为发起请求的节点和提供数据的节点并不是同一个。写数据块发起请求方也提供数据,替换数据块请求方不提供数据,而是提供了一个数据源(proxySource参数),由replaceBlock发起一个拷贝数据块的请求,建立数据源。对于拷贝数据块操作,isRecovery=falseclient=”” srcDataNode=null。注意,我们在分析BlockReceiver是,讨论过client=””的情况,就是应用于这种场景。在创建BlockReceiver对象前,需要利用下面介绍的拷贝数据块的请求建立到数据源的socket连接并发送拷贝数据块请求。然后通过BlockReceiver.receiveBlock接收数据。任务成功后将结果通知notifyNamenodeReceivedBlock。拷贝数据块的请求如图(操作码84)。和读数据块操作请求类似,但是读取的是整个数据块,所以少了很多参数。 读数据块检验码的请求如图(操作码85)。它能够读取某个数据块的检验和的MD5结果,实现的方法很简单。

----------------------------------------------------------------------------------------------------------------------------------------------------


通过上面的讨论,DataNode上的读/写流程已经基本清楚了。我们来看下一个非主流流程,DataBlockScanner用于定时对数据块文件进行校验。类图如下: 2.PNG
DataBlockScanner拥有它单独的线程,能定时地从目前DataNode管理的数据块文件进行校验。其实最重要的方法就是verifyBlock,我们来看这个方法最关键的地方:Java代码
  1. blockSender = new BlockSender(block, 0, -1, false, false, true, datanode);   
  2. DataOutputStream out = new DataOutputStream(new IOUtils.NullOutputStream());   
  3. blockSender.sendBlock(out, null, throttler);  
复制代码
校验利用了BlockSender,因为我们知道BlockSender中,发送数据的同时,会对数据进行校验。verifyBlock只需要读一个Block到一个空输出设备(NullOutputStream),如果有异常,那么校验失败,如果正常,校验成功。DataBlockScanner其他的辅助方法用于对DataBlockScanner管理的数据块文件信息进行增加/删除,排序操作。同时,校验的信息还会保持在Storage上,保存在dncp_block_verification.log.curr和dncp_block_verification.log.prev中。



---------------------------------------------------------------------------------------------------------------------------------------------------










周围的障碍扫清以后,我们可以开始分析类DataNode。类图如下:
3.PNG
public class DataNode extends Configured
    implementsInterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable






上面给出了DataNode的继承关系,我们发现,DataNode实现了两个通信接口,其中ClientDatanodeProtocol是用于和Client交互的,InterDatanodeProtocol,就是我们前面提到的DataNode间的通信接口。ipcServer(类图的左下方)是DataNode的一个成员变量,它启动了一个IPC服务,这样,DataNode就能提供ClientDatanodeProtocol和InterDatanodeProtocol的能力了。
我们从main函数开始吧。这个函数很简单,调用了createDataNode的方法,然后就等着DataNode的线程结束。createDataNode首先调用instantiateDataNode初始化DataNode,然后执行runDatanodeDaemon。runDatanodeDaemon会向NameNode注册,如果成功,才启动DataNode线程,DataNode就开始干活了。
初始化DataNode的方法instantiateDataNode会读取DataNode需要的配置文件,同时读取配置的storage目录(可能有多个,看storage的讨论部分),然后把这两参数送到makeInstance中,makeInstance会先检查目录(存在,是目录,可读,可写),然后调用:
new DataNode(conf, dirs);
接下来控制流就到了构造函数上。构造函数调用startDataNode,完成和DataNode相关的初始化工作(注意,DataNode工作线程不在这个函数里启动)。首先是初始化一堆的配置参数,什么NameNode地址,socket参数等等。然后,向NameNode请求配置信息(DatanodeProtocol.versionRequest),并检查返回的NamespaceInfo和本地的版本是否一致。
正常情况的下一步是检查文件系统的状态并做必要的恢复,初始化FSDataset(到这个时候,上面图中storage和data成员变量已经初始化)。
然后,找一个端口并创建DataXceiverServer(run方法里启动),创建DataBlockScanner(根据需要在offerService中启动,只启动一次),创建DataNode上的HttpServer,启动ipcServer。这样就结束了DataNode相关的初始化工作。
在启动DataNode工作线程前,DataNode需要向NameNode注册。注册信息在初始化的时候已经构造完毕,包括DataXceiverServer端口,ipcServer端口,文件布局版本号等重要信息。注册成功后就可以启动DataNode线程。
DataNode的run方法,循环里有两种选择,升级(暂时不讨论)/正常工作。我们来看正常工作的offerService方法。offerService也是个循环,在循环里,offerService会定时向NameNode发送心跳,报告系统中Block状态的变化,报告DataNode现在管理的Block状态。发送心跳和Block状态报告时,NameNode会返回一些命令,DataNode将执行这些命令。
心跳的处理比较简单,以heartBeatInterval间隔发送。
Block状态变化报告,会利用保存在receivedBlockList和delHints两个列表中的信息。receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点。如在DataXceiver的replaceBlock中,有调用:
datanode.notifyNamenodeReceivedBlock(block,sourceID)
这表明,DataNode已经从sourceID上接收了一个Block,sourceID上对应的Block可以删除了(这个场景出现在当系统需要做负载均衡时,Block在DataNode之间拷贝)。
Block状态变化报告通过NameNode.blockReceived来报告。
Block状态报告也比较简单,以blockReportInterval间隔发送。
心跳和Block状态报告可以返回命令,这也是NameNode先DataNode发起请求的唯一方法。我们来看一下都有那些命令:
  DNA_TRANSFER:拷贝数据块到其他DataNode
  DNA_INVALIDATE:删除数据块(简单方法)
  DNA_SHUTDOWN:关闭DataNode(简单方法)
  DNA_REGISTERDataNode重新注册(简单方法)
  DNA_FINALIZE:提交升级(简单方法)
  DNA_RECOVERBLOCK:恢复数据块
拷贝数据块到其他DataNode由transferBlocks方法执行。注意,返回的命令可以包含多个数据块,每一个数据块可以包含多个目标地址。transferBlocks方法将为每一个Block启动一个DataTransfer线程,用于传输数据。
DataTransfer是一个DataNode的内部类,它利用我们前面介绍的OP_WRITE_BLOCK写数据块操作,发送数据到多个目标上面。
恢复数据块和NameNode的租约(lease)恢复有关,我们后面再讨论。


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条