分享

Hadoop二次开发必备,Hadoop源码分析(四)

本帖最后由 pig2 于 2014-1-16 00:29 编辑

分析完Storage相关的类以后,我们来看下一个大家伙,FSDataset相关的类。
上面介绍Storage时,我们并没有涉及到数据块Block的操作,所有和数据块相关的操作,都在FSDataset相关的类中进行处理。下面是类图:
12.PNG

Block是对一个数据块的抽象,通过前面的讨论我们知道一个Block对应着两个文件,其中一个存数据,一个存校验信息,如下:
blk_3148782637964391313
blk_3148782637964391313_242812.meta
上面的信息中,blockId是3148782637964391313,242812是数据块的版本号,当然,系统还会保存数据块的大小,在类中是属性numBytes。Block提供了一系列的方法来操作对象的属性。

DatanodeBlockInfo存放的是Block在文件系统上的信息。它保存了Block存放的卷(FSVolume),文件名和detach状态。这里有必要解释一下detach状态:我们前面分析过,系统在升级时会创建一个snapshot,snapshot的文件和current里的数据块文件和数据块元文件是通过硬链接,指向了相同的内容。当我们需要改变current里的文件时,如果不进行detach操作,那么,修改的内容就会影响snapshot里的文件,这时,我们需要将对应的硬链接解除掉。方法很简单,就是在临时文件夹里,复制文件,然后将临时文件改名成为current里的对应文件,这样的话,current里的文件和snapshot里的文件就detach了。这样的技术,也叫copy-on-write,是一种有效提高系统性能的方法。DatanodeBlockInfo中的detachBlock,能够对Block对应的数据文件和元数据文件进行detach操作。

介绍完类Block和DatanodeBlockInfo后,我们来看FSVolumeSet,FSVolume和FSDir。我们知道在一个DataNode上可以指定多个Storage来存储数据块,由于HDFS规定了一个目录能存放Block的数目,所以一个Storage上存在多个目录。对应的,FSDataset中用FSVolume来对应一个Storage,FSDir对应一个目录,所有的FSVolume由FSVolumeSet管理,FSDataset中通过一个FSVolumeSet对象,就可以管理它的所有存储空间。


FSDir对应着HDFS中的一个目录,目录里存放着数据块文件和它的元文件。FSDir的一个重要的操作,就是在添加一个Block时,根据需要有时会扩展目录结构,上面提过,一个Storage上存在多个目录,所有的目录,都对应着一个FSDir,目录的关系,也由FSDir保存。FSDir的getBlockInfo方法分析目录下的所有数据块文件信息,生成Block对象,存放到一个集合中。getVolumeMap方法能,则会建立Block和DatanodeBlockInfo的关系。以上两个方法,用于系统启动时搜集所有的数据块信息,便于后面快速访问。


FSVolume对应着是某一个Storage。数据块文件,detach文件和临时文件都是通过FSVolume来管理的,这个其实很自然,在同一个存储系统上移动文件,往往只需要修改文件存储信息,不需要搬数据。FSVolume有一个recoverDetachedBlocks的方法,用于恢复detach文件。和Storage的状态管理一样,detach文件有可能在复制文件时系统崩溃,需要对detach的操作进行回复。FSVolume还会启动一个线程,不断更新FSVolume所在文件系统的剩余容量。创建Block的时候,系统会根据各个FSVolume的容量,来确认Block的存放位置。

FSVolumeSet就不讨论了,它管理着所有的FSVolume。

HDFS中,对一个chunk的写会使文件处于活跃状态,FSDataset中引入了类ActiveFile。ActiveFile对象保存了一个文件,和操作这个文件的线程。注意,线程有可能有多个。ActiveFile的构造函数会自动地把当前线程加入其中。

有了上面的基础,我们可以开始分析FSDataset。FSDataset实现了接口FSDatasetInterface。FSDatasetInterface是DataNode对底层存储的抽象。

下面给出了FSDataset的关键成员变量:
  1. FSVolumeSet volumes;
  2.   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
  3.   private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
复制代码
其中,volumes就是FSDataset使用的所有Storage,ongoingCreates是Block到ActiveFile的映射,也就是说,说有正在创建的Block,都会记录在ongoingCreates里。

下面我们讨论FSDataset中的方法。


public long getMetaDataLength(Block b) throws IOException;
得到一个block的元数据长度。通过blockID,找对应的元数据文件,返回文件长度。


public MetaDataInputStream getMetaDataInputStream(Block b) throws IOException;
得到一个block的元数据输入流。通过blockID,找对应的元数据文件,在上面打开输入流。下面对于类似的简单方法,我们就不再仔细讨论了。


public booleanmetaFileExists(Block b) throwsIOException;
判断block的元数据的元数据文件是否存在。简单方法。


public longgetLength(Block b) throwsIOException;
block的长度。简单方法。


public Block getStoredBlock(longblkid) throwsIOException;
通过BlockID,找到对应的Block。简单方法。


public InputStream getBlockInputStream(Block b) throws IOException;
public InputStream getBlockInputStream(Block b, long seekOffset) throws IOException;
得到Block数据的输入流。简单方法。


public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff) throws IOException;
得到Block的临时输入流。注意,临时输入流是指对应的文件处于tmp目录中。新创建块时,块数据应该写在tmp目录中,直到写操作成功,文件才会被移动到current目录中,如果失败,就不会影响current目录了。简单方法。


public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException;
得到一个block的输出流。BlockWriteStreams既包含了数据输出流,也包含了元数据(校验文件)输出流,这是一个相当复杂的方法。

参数isRecovery说明这次写是不是对以前失败的写的一次恢复操作。我们先看正常的写操作流程:首先,如果输入的block是个正常的数据块,或当前的block已经有线程在写,writeToBlock会抛出一个异常。否则,将创建相应的临时数据文件和临时元数据文件,并把相关信息,创建一个ActiveFile对象,记录到ongoingCreates中,并创建返回的BlockWriteStreams。前面我们已经提过,建立新的ActiveFile时,当前线程会自动保存在ActiveFile的threads中。

我们以blk_3148782637964391313为例,当DataNode需要为Block ID为3148782637964391313创建写流时,DataNode创建文件tmp/blk_3148782637964391313做为临时数据文件,对应的meta文件是tmp/blk_3148782637964391313_XXXXXX.meta。其中XXXXXX是版本号。

isRecovery为true时,表明我们需要从某一次不成功的写中恢复,流程相对于正常流程复杂。如果不成功的写是由于提交(参考finalizeBlock方法)后的确认信息没有收到,先创建一个detached文件(备份)。接着,writeToBlock检查是否有还有对文件写的线程,如果有,则通过线程的interrupt方法,强制结束线程。这就是说,如果有线程还在写对应的文件块,该线程将被终止。同时,从ongoingCreates中移除对应的信息。接下来将根据临时文件是否存在,创建/复用临时数据文件和临时数据元文件。后续操作就和正常流程一样,根据相关信息,创建一个ActiveFile对象,记录到ongoingCreates中……

由于这块涉及了一些HDFS写文件时的策略,以后我们还会继续讨论这个话题。


public voidupdateBlock(Block oldblock, Block newblock) throws IOException;
更新一个block。这也是一个相当复杂的方法。

updateBlock的最外层是一个死循环,循环的结束条件,是没有任何和这个数据块相关的写线程。每次循环,updateBlock都会去调用一个叫tryUpdateBlock的内部方法。tryUpdateBlock发现已经没有线程在写这个块,就会跟新和这个数据块相关的信息,包括元文件和内存中的映射表volumeMap。如果tryUpdateBlock发现还有活跃的线程和该块关联,那么,updateBlock会试图结束该线程,并等在join上等待。


public voidfinalizeBlock(Block b) throwsIOException;
提交(或叫:结束finalize)通过writeToBlock打开的block,这意味着写过程没有出错,可以正式把Blocktmp文件夹放到current文件夹。

在FSDataset中,finalizeBlock将从ongoingCreates中删除对应的block,同时将block对应的DatanodeBlockInfo,放入volumeMap中。我们还是以blk_3148782637964391313为例,当DataNode提交Block ID为3148782637964391313数据块文件时,DataNode将把tmp/blk_3148782637964391313移到current下某一个目录,以subdir12为例,这是tmp/blk_3148782637964391313将会挪到current/subdir12/blk_3148782637964391313。对应的meta文件也在目录current/subdir12下。

public voidunfinalizeBlock(Block b) throwsIOException;
取消通过writeToBlock打开的block,与finalizeBlock方法作用相反。简单方法。
public booleanisValidBlock(Block b);
Block是否有效。简单方法。
public voidinvalidate(Block invalidBlks[]) throwsIOException;
使block变为无效。简单方法。
public void validateBlockMetadata(Block b) throws IOException;
检查block的有效性。简单方法。

下一篇


上一篇

已有(1)人评论

跳转到指定楼层
轩辕依梦Q 发表于 2014-10-19 19:56:10
谢谢楼主,收藏整个系列
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条