分享

Hadoop二次开发必备,Hadoop源码分析(五)

本帖最后由 pig2 于 2014-1-16 00:28 编辑

通过上面的一系列介绍,我们知道了DataNode工作时的文件结构和文件结构在内存中的对应对象。下面我们可以来开始分析DataNode上的动态行为。首先我们来分析DataXceiverServer和DataXceiver。DataNode上数据块的接受/发送并没有采用我们前面介绍的RPC机制,原因很简单,RPC是一个命令式的接口,而DataNode处理数据部分,往往是一种流式机制。DataXceiverServer和DataXceiver就是这个机制的实现。其中,DataXceiver还依赖于两个辅助类:BlockSender和BlockReceiver。下面是类图:
13.PNG

(为了简单起见,BlockSender和BlockReceiver的成员变量没有进入UML模型中)DataXceiverServer很简单,它打开一个端口,然后每接收到一个连接,就创建一个DataXceiver,服务于该连接,并记录该连接的socket,对应的实现在DataXceiverServer的run方法里。当系统关闭时,DataXceiverServer将关闭监听的socket和所有DataXceiver的socket,这样就导致了DataXceiver出错并结束线程。DataXceiver才是真正干活的地方,目前,DataXceiver支持的操作总共有六条,分别是:OP_WRITE_BLOCK (80):写数据块OP_READ_BLOCK (81):读数据块OP_READ_METADATA (82):读数据块元文件OP_REPLACE_BLOCK (83):替换一个数据块OP_COPY_BLOCK (84):拷贝一个数据块OP_BLOCK_CHECKSUM (85):读数据块检验码DataXceiver首先读取客户端的版本号并检验,然后再读取一个字节的操作码,并转入相关的子程序进行处理。我们先看一下读数据块的过程吧。首先看输入,下图是读数据块时,客户端发送过来的信息: 14.PNG
包括了要读取的Block的ID,时间戳,开始偏移和读取的长度,最后是客户端的名字(貌似只是在写日志的时候用到了)。根据上面的信息,我们可以创建一个BlockSender,如果BlockSender没有出错,返回客户端一个正确指示后,否则,返回错误码。成功创建BlockSender以后,就可以开始通过BlockSender.sendBlock发送数据。下面我们就来分析BlockSender。BlockSender的构造函数看似很复杂,其实就是根据需求(特别是在处理checksum上,因为checksum是基于块的),打开相应的数据流。close()用于释放各种资源,如已经打开的数据流。sendBlock用于发送数据,数据发送包括应答头和后续的数据包。应答头如下(包含DataXceiver中发送的成功标识):
15.PNG
然后后面的数据就组织成数据包来发送,包结构如下: 16.PNG
各个字段含义:packetLen:包长度,包括包头
offset:偏移量
seqno:包序列号
tail:是否是最后一个包
len:数据长度
checksum:检验数据
data:数据块数据需要注意的,在写数据前,BlockSender会校验数据,保证数据包中的checksum和数据的一致性。同时,如果数据出错,将会有ChecksumException抛出。数据传输结束的标志,是一个packetLen长度为0的包。客户端可以返回一个两字节的应答OP_STATUS_CHECKSUM_OK(5)
---------------------------------------------------------------------------------------------------------------------------------------------------------继续DataXceiver分析,下一块硬骨头是写数据块。HDFS的写数据操作,比读数据复杂N多倍。读数据的时候,只需要在多个数据块文件的选一个读,就可以了;但是,写数据需要同时写到多个数据块文件上,这就比较复杂了。HDFS实现了了Google写文件时的机制,如下图: 17.PNG
数据流从客户端开始,流经一系列的节点,到达最后一个DataNode。图中的所有DataNode只需要写一次硬盘,DataNode1和DataNode2会将从socket上接受到的数据,直接写到到下个节点的socket上。我们来看一下写数据块的请求。
18.PNG
首先是客户端的版本号和一个字节的操作码,接下来是我们熟悉的blockId和generationStamp。参数pipelineSize是整个数据流链的长度,以上面为例,pipelineSize=3。isRecovery指示这次写是否是一次恢复操作,还记得我们在讨论FSDataset.writeToBlock时的那个参数吗?isRecovery来自客户端。client是客户端的名字,就是发起请求的节点名,需要特别注意的是,如果是从NameNode来的复制请求,client为空。hasSrcDataNode是一个标志位,如果被设置,表明源节点是个DataNode,接下来读取的数据就是DataNode的信息。numTargets是目标节点的数目,包括当前节点,以上面的图为例,DataNode1上这个参数值为3,到了DataNode3,就只有1了。targets包含了目标节点的相关信息,根据这些信息,就可以创建到它们上面的socket连接。targets后跟着的是校验头。writeBlock最开始是处理上面提到的消息包,然后创建一个BlockReceiver。接下来就是创建一堆用于读写的流,如下图(图中除了in外,都是在writeBlock中创建,这个图还不涉及在BlockReceiver对本地文件读写的流):
19.PNG
在进行实际的数据写之前,上面的这些流会被建立起来(也就是说,DataNode1到DataNode3都可写以后,才开始处理写数据)。如果其中某一个点出错了,那么,出错的节点名会通过mirrorIn发送回来,一直沿着这条链,传播到客户端。如果一切正常,那么,BlockReceiver.receiveBlock就开始干活了。BlockReceiver的构造函数会创建写数据块和校验数据的输出流。剩下的就交给receiveBlock这个大家伙了。首先receiveBlock会再启动一个线程(一般来说,BlockReceiver就跑在它自己的线程上),用于处理应答(内部类PacketResponder定义了该线程),然后就不断调用receivePacket读数据。数据是以分块的形式传送,格式和读Block的时候是一样的。如下图(很奇怪,为啥不抽象为类): 20.PNG
注意:如果当前DataNode处于数据流的中间,该数据包会发送到下一个节点。接下来的处理,就是处理数据和校验,并分别写到数据块文件和数据块元数据文件。如果出错,抛出的异常会导致receiveBlock关闭相关的输出流,并终止传输。注意,数据校验出错还会上报到NameNode上。PacketResponder用于处理应答。也就是上面讲的mirrorIn和replyOut。PacketResponder里有一个队列ackQueue,receivePacket每收到一个包,都会往队列里添加一项。PacketResponder的run方法,根据工作的DataNode所处的位置,行为不一样。最后一个DataNode由于没有后续节点,PacketResponder的ackQueue每收到一项,表明对应的数据块已经处理完毕,那么就可以发送成功应答。

如果该应答是最后一个包的,PacketResponder会关闭相关的输出流,并提交(前面讲FSDataset时后我们讨论过的finalizeBlock方法)。如果DataNode有后续节点,那么,它必须等到后续节点的成功应答,才可以发送应答到它前面的节点。PacketResponder的run方法还引入了心跳机制,用于检测连接是否还存在。 注意:所有改变DataNode的操作,需要把信息更新到NameNode上,这是通过DataNode.notifyNamenodeReceivedBlock方法,然后通过DataNode统一发送到NameNode上。
下一篇

上一篇




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条