分享

Hadoop二次开发必备,Hadoop源码分析(十五)

本帖最后由 pig2 于 2014-1-16 00:23 编辑

我们可以开始从系统的外部来了解HDFS了,DFSClient提供了连接到HDFS系统并执行文件操作的基本功能。DFSClient也是个大家伙,我们先分析它的一些内部类。我们先看LeaseChecker。租约是客户端对文件写操作时需要获取的一个凭证,前面分析NameNode时,已经了解了租约,INodeFileUnderConstruction的关系,INodeFileUnderConstruction只有在文件写的时候存在。客户端的租约管理很简单,包括了增加的put和删除的remove方法,run方法会定期执行,并通过ClientProtocl的renewLease,自动延长租约。

接下来我们来分析内部为文件读引入的类。

1.PNG


InputStream是系统的虚类,提供了3个read方法,一个skip(跳过数据)方法,一个available方法(目前流中可读的字节数),一个close方法和几个在输入流中做标记的方法(mark:标记,reset:回到标记点和markSupported:能力查询)。
FSInputStream也是一个虚类,它将接口Seekable和PositionedReadable混插到类中。Seekable提供了可以在流中定位的能力(seek,getPos和seekToNewSource),而PositionedReadable提高了从某个位置开始读的方法(一个read方法和两个readFully方法)。
FSInputChecker在FSInputStream的基础上,加入了HDFS中需要的校验功能。校验在readChecksumChunk中实现,并在内部的read1方法中调用。所有的read调用,最终都是使用read1读数据并做校验。如果校验出错,抛出异常ChecksumException。
有了支持校验功能的输入流,就可以开始构建基于Block的输入流了。我们先回顾前面提到的读数据块的请求协议:

2.PNG


然后我们来分析一下创建BlockReader需要的参数,newBlockReader最复杂的请求如下:   public staticBlockReader newBlockReader( Socket sock, String file,
                                      longblockId,
                                       longgenStamp,
                                      longstartOffset,long len,
                                       intbufferSize, boolean verifyChecksum,
                                      StringclientName)
                                      throws IOException其中,sock为到DataNode的socket连接,file是文件名(只是用于日志输出),其它的参数含义都很清楚,和协议基本是一一对应的。该方法会和DataNode进行对话,发送上面的读数据块的请求,处理应答并构造BlockReader对象(BlockReader的构造函数基本上只有赋值操作)。BlockReader的readChunk用于处理DataNode送过来的数据,格式前面我们已经讨论过了,如下图。
3.PNG

读数据用的read,会调用父类FSInputChecker的read,最后调用readChunk,如下:
4.PNG

read如果发现读到正确的校验码,则用过checksumOk方法,向DataNode发送成功应达。

BlockReader的主要流程就介绍完了,接下来分析DFSInputStream,它封装了DFSClient读文件内容的功能。在它的内部,不但要处理和NameNode的通信,同时通过BlockReader,处理和DataNode的交互。

DFSInputStream记录Block的成员变量是:   

private LocatedBlocks locatedBlocks = null;

它不但保持了文件对应的Block序列,还保持了管理Block的DataNode的信息,是DFSInputStream中最重要的成员变量。DFSInputStream的构造函数,通过类内部的openInfo方法,获取这个变量的值。openInfo间接调用了NameNode的getBlockLocations,获取LocatedBlocks。

DFSInputStream中处理数据块位置的还有下面一些函数:   

synchronized List<LocatedBlock> getAllBlocks() throws IOException   
private LocatedBlock getBlockAt(long offset) throws IOException   
private synchronized List<LocatedBlock> getBlockRange(long offset, long length)   
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException

它们的功能都很清楚,需要注意的是他们处理过程中可能会调用再次调用NameNode的getBlockLocations,使得流程比较复杂。blockSeekTo还会创建对应的BlockReader对象,它被几个重要的方法调用(如下图)。在打开到DataNode之前,blockSeekTo会调用chooseDataNode,选择一个现在活着的DataNode。
5.PNG

通过上面的分析,我们已经知道了在什么时候会连接NameNode,什么时候会打开到DataNode的连接。下面我们来看读数据。read方法定义如下:
    public int read(long position, byte[] buffer, int offset, int length)
该方法会从流的position位置开始,读取最多length个byte到buffer中offset开始的空间中。参数检测完以后,通过getBlockRange获取要读取的数据块对应的block范围,然后,利用fetchBlockByteRange方法,读取需要的数据。
fetchBlockByteRange从某一个数据块中读取一段数据,定义如下:
    private void fetchBlockByteRange(LocatedBlock block, long start, long end, byte[] buf, int offset)
由于读取的内容都在一个数据块内部,这个方法会创建BlockReader,然后利用BlockReader的readAll方法,读取数据。读的过程中如果发生校验错,那么,还会通过reportBadBlocks,向NameNode报告校验错。
另一个读方法是:
    public synchronized int read(byte buf[], int off, int len) throws IOException
它在流的当前位置(可以通过seek方法调整)读取数据。首先它会判断当前流的位置,如果已经越过了对象现在的blockReader能读取的范围(当上次read读到数据块的尾部时,会发生这中情况),那么通过blockSeekTo打开到下一个数据块的blockReader。然后,read在当前的这个数据块中通过readBuffer读数据。主要,这个read方法只在一块数据块中读取数据,就是说,如果还有空间可以存放数据但已经到了数据块的尾部,它不会打开到下一个数据块的BlockReader继续读,而是返回,返回值包含了以读取数据的长度。
DFSDataInputStream是一个Wrapper(DFSInputStream),我们就不讨论了。


----------------------------------------------------------------------------------------------------------------------------------------------------


接下来当然是分析输出流了。
处于继承体系的最上方是OutputStream,它实现了Closeable(方法close)和Flushable(方法flush)接口,提供了3个不同形式的write方法,这些方法的含义都很清楚。接下来的是FSOutputSummer,它引入了HDFS写数据时需要的计算校验和的功能。FSOutputSummer的write方法会调用write1,write1中计算校验和并将用户输入的数据拷贝到对象的缓冲区中,缓冲区满了以后会调用flushBuffer,flushBuffer最终调用还是虚方法的writeChunk,这个时候,缓冲区对应的校验和缓冲区对的内容都已经准备好了。通过这个类,HDFS可以把一个流转换成为DataNode数据接口上的包格式(前面我们讨论过这个包的格式,如下)。


6.PNG




DFSOutputStream继承自FSOutputSummer,是一个非常复杂的类,它包含了几个内部类。我们先分析Packet,其实它对应了上面的数据包,有了上面的图,这个类就很好理解了,它的成员变量和上面数据块包含的信息基本一一对应。构造函数需要的参数有pktSize,包的大小,chunksPerPkt,chunk的数目(chunk是一个校验单元)和该包在Block中的偏移量offsetInBlock。writeData和writeChecksum用于往缓冲区里写数据/校验和。getBuffer用户获得整个包,包括包头和数据。
DataStreamer和ResponseProcessor用于写包/读应答,和我们前面讨论DataNode的Pipe写时类似,客户端写数据也需要两个线程,下图扩展了我们在讨论DataNode处理写时的示意图,包含了客户端:


7.PNG


DataStreamer启动后进入一个循环,在没有错误和关闭标记为false的情况下,该循环首先调用processDatanodeError,处理可能的IO错误,这个过程比较复杂,我们在后面再讨论。
接着DataStreamer会在dataQueue(数据队列)上等待,直到有数据出现在队列上。DataStreamer获取一个数据包,然后判断到DataNode的连接是否是打开的,如果不是,通过DFSOutputStream.nextBlockOutputStream打开到DataNode的连接,并启动ResponseProcessor线程。
DataNode的连接准备好以后,DataStreamer获取数据包缓冲区,然后将数据包从dataQueue队列挪到ackQueue队列,最后通过blockStream,写数据。如果数据包是最后一个,那么,DataStreamer将会写一个长度域为0的包,指示DataNode数据传输结束。
DataStreamer的循环在最后一个数据包写出去以后,会等待直到ackQueue队列为空(表明所有的应答已经被接收),然后做清理动作(包括关闭socket连接,ResponseProcessor线程等),退出线程。
ResponseProcessor相对来说比较简单,就是等待来自DataNode的应答。如果是成功的应答,则删除在ackQueue的包,如果有错误,那么,记录出错的DataNode,并设置标志位。


------------------------------------------------------------------------------------------------------------------------------------------------


有了上面的基础,我们可以来解剖DFSOutputStream了。先看构造函数:
    private DFSOutputStream(String src, long blockSize, Progressable progress,
        int bytesPerChecksum) throws IOException

    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException

    DFSOutputStream(String src, int buffersize, Progressable progress,
        LocatedBlock lastBlock, FileStatus stat,
        int bytesPerChecksum) throws IOException {
这些构造函数的参数主要有:文件名src;进度回调函数progress(预留接口,目前未使用);数据块大小blockSize;Block副本数replication;每个校验chunk的大小bytesPerChecksum;文件权限masked;是否覆盖原文件标记overwrite;文件状态信息stat;文件的最后一个Block信息lastBlock;buffersize(?未见引用)。
后面两个构造函数会调用第一个构造函数,这个函数会调用父类的构造函数,并设置对象的src,blockSize,progress和checksum属性。
第二个构造函数会调用namenode.create方法,在文件空间中建立文件,并启动DataStreamer,它被DFSClient的create方法调用。第三个构造函数被DFSClient的append方法调用,显然,这种情况比价复杂,文件拥有一些数据块,添加数据往往添加在最后的数据块上。同时,append方法调用时,Client已经知道了最后一个Block的信息和文件的一些信息,如FileStatus中包含的Block大小,文件权限位等等。结合这些信息,构造函数需要计算并设置一些对象成员变量的值,并试图从可能的错误中恢复(调用processDatanodeError),最后启动DataStreamer。
我们先看正常流程,前面已经分析过,通过FSOutputSummer,HDFS客户端能将流转换成package,这个包是通过writeChunk,发送出去的,下面是它们的调用关系。

8.PNG


在检查完一系列的状态以后,writeChunk先等待,直到dataQueue中未发送的包小于门限值。如果现在没有可用的Packet对象,则创建一个Packet对象,往Packet中写数据,包括校验值和数据。如果数据包被写满,那么,将它放入发送队列dataQueue中。writeChunk的过程比较简单,这里的写入,也只是把数据写到本地队列,等待DataStreamer发送,没有实际写到DataNode上。
createBlockOutputStream用于建立到第一个DataNode的连接,它的声明如下:
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
                    boolean recoveryFlag)
nodes是所有接收数据的DataNode列表,client就是客户端名称,recoveryFlag指示是否是为错误恢复建立的连接。createBlockOutputStream很简单,打开到第一个DataNode的连接,然后发送下面格式的数据包,并等待来自DataNode的Ack。如果出错,记录出错的DataNode在nodes中的位置,设置errorIndex并返回false。


9.PNG



当recoveryFlag指示为真时,意味着这次写是一次恢复操作,对于DataNode来说,这意味着为写准备的临时文件(在tmp目录中)可能已经存在,需要进行一些特殊处理,具体请看FSDataset的实现。
当Client写数据需要一个新的Block的时候,可以调用nextBlockOutputStream方法。
    private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException
这个方法的实现很简单,首先调用locateFollowingBlock(包含了重试和出错处理),通过namenode.addBlock获取一个新的数据块,返回的是DatanodeInfo列表,有了这个列表,就可以建立写数据的pipe了。下一个大动作就是调用上面的createBlockOutputStream,建立到DataNode的连接了。
有了上面的准备,我们来分析processDatanodeError,它的主要流程是:
  •            参数检查;
  •            关闭可能还打开着的blockStream和blockReplyStream;
  •            将未收到应答的数据块(在ackQueue中)挪到dataQueue中;
  •            循环执行:
1.      计算目前还活着的DataNode列表;
2.      选择一个主DataNode,通过DataNode RPC的recoverBlock方法启动它上面的恢复过程;
3.      处理可能的出错;
4.      处理恢复后Block可能的变化(如Stamp变化);
5.      调用createBlockOutputStream到DataNode的连接。

  •            启动ResponseProcessor。

这个过程涉及了DataNode上的recoverBlock方法和createBlockOutputStream中可能的Block恢复,是一个相当耗资源的方法,当系统出错的概率比较小,而且数据块上能恢复的数据很多(平均32M),还是值得这样做的。
写的流程就分析到着,接下来我们来看流的关闭,这个过程也涉及了一系列的方法,它们的调用关系如下:


10.PNG



flushInternal会一直等待到发送队列(包括可能的currentPacket)和应答队列都为空,这意味着数据都被DataNode顺利接收。
sync作用和UNIX的sync类似,将写入数据持久化。它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。然后调用namenode.fsync,持久化命名空间上的数据。
closeInternal比较复杂一点,它首先调用父类的flushBuffer方法,将可能还没拷贝到DFSOutputStream的数据拷贝回来,然后调用flushInternal,等待所有的数据都写完。接着结束两个工作线程,关闭socket,最后调用amenode.complete,通知NameNode结束一次写操作。close方法先调用closeInternal,然后再本地的leasechecker中移除对应的信息。

----------------------------------------------------------------------------------------------------------------------------------------------------
前面分析的DFSClient内部类,占据了这个类的实现部分的2/3,我们来看剩下部分。
DFSClient的成员变量不多,而且大部分是系统的缺省配置参数,其中比较重要的是到NameNode的RPC客户端:
  public final ClientProtocol namenode;
  final private ClientProtocol rpcNamenode;
它们的差别是namenode在rpcNamenode的基础上,增加了失败重试功能。DFSClient中提供可各种构造它们的static函数,createClientDatanodeProtocolProxy用于生成到DataNode的RPC客户端。
DFSClient的构造函数也比价简单,就是初始化成员变量,close用于关闭DFSClient。
下面的功能,DFSClient只是简单地调用NameNode的对应方法(加一些简单的检查),就不罗嗦了:
setReplication/rename/delete/exists(通过getFileInfo的返回值是否为空判断)/listPaths/getFileInfo/setPermission/setOwner/getDiskStatus/totalRawCapacity/totalRawUsed/datanodeReport/setSafeMode/refreshNodes/metaSave/finalizeUpgrade/mkdirs/getContentSummary/setQuota/setTimes
DFSClient提供了各种create方法,它们最后都是构造一个OutputStream,并将文件名和生成的OutputStream加到leasechecker,完成创建动作。
append操作是通过namenode.append,获取最后的Block信息,然后构造一个OutputStream,并将文件名和生成的OutputStream加到leasechecker,完成append动作。
getFileChecksum用于获取文件的校验信息,它在得到数据块的位置信息后利用DataNode提供的OP_BLOCK_CHECKSUM操作,获取需要的数据,并综合起来。过程简单,方法主要是在处理OP_BLOCK_CHECKSUM需要交换的数据包。
DFSClient内部还有一些其它的辅助方法,都比较简单,就不再分析了。

上一篇
Hadoop二次开发必备,Hadoop源码分析(十四)
欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(2)人评论

跳转到指定楼层
xiejin2008 发表于 2015-4-15 11:29:05
牛叉牛叉牛叉牛叉牛叉牛叉
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条