分享

Hadoop二次开发必备,Hadoop源码分析(十二)

本帖最后由 pig2 于 2014-1-16 00:20 编辑

下面是和目录树相关的方法。
  public boolean rename(String src, Stringdst) throwsIOException;
更改文件名。调用FSNamesystem的renameTo,干活的是renameToInternal,最终调用FSDirectory的renameTo方法,如果成功,更新租约的文件名,如下:
     changeLease(src, dst, dinfo);
  public boolean delete(String src) throws IOException;
  public boolean delete(String src, boolean recursive) throws IOException;
第一个已经废弃不用,使用第二个方法。
最终使用deleteInternal,该方法调用FSDirectory.delete()。
  public boolean mkdirs(String src,FsPermission masked) throwsIOException;
在做完一系列检查以后,调用FSDirectory.mkdirs()。
  publicFileStatus[] getListing(String src) throws IOException;
前面我们已经讨论了。

下面是其它和系统维护管理的方法。
  public voidrenewLease(String clientName) throws IOException;
就是调用了一下leaseManager.renewLease(holder),没有其他的事情需要做,简单。
  public void refreshNodes() throws IOException;
还记得我们前面分析过NameNode上有个DataNode在线列表和DataNode离线列表吗,这个命令可以让NameNode从新读这两个文件。当然,根据前后DataNode的状态,一共有4种情况,其中有3种需要修改。
对于从工作状态变为离线的,需要将上面的DataNode复制到其他的DataNode,需要调用updateNeededReplications方法(前面我们已经讨论过这个方法了)。
对于从离线变为工作的DataNode,只需要改变一下状态。
  public void finalizeUpgrade() throws IOException;
finalize一个升级,确认客户端有超级用户权限以后,调用FSImage.finalizeUpgrade()。
  public void fsync(String src, Stringclient) throwsIOException;
将文件信息持久化。在检查租约信息后,调用FSDirectory的persistBlocks,将文件的原信息通过logOpenFile(path, file)写日志。

搞定ClientProtocol,接下来是DatanodeProtocol部分。接口如下:
8.PNG

public DatanodeRegistration register(DatanodeRegistration nodeReg                                       ) throws IOException用于DataNode向NameNode登记。输入和输出参数都是DatanodeRegistration,类图如下:
9.PNG

前面讨论DataNode的时候,我们已经讲过了DataNode的注册过程,我们来看NameNode的过程。下面是主要步骤:l          检查该DataNode是否能接入到NameNode;
l          准备应答,更新请求的DatanodeID;
l          从datanodeMap(保存了StorageID à DatanodeDescriptor的映射,用于保证DataNode使用的Storage的一致性)得到对应的DatanodeDescriptor,为nodeS;
l          从Host2NodesMap(主机名到DatanodeDescriptor数组的映射)中获取DatanodeDescriptor,为nodeN;
l          如果nodeN!=null同时nodeS!=nodeN(后面的条件表明表明DataNode上使用的Storage发生变化),那么我们需要先在系统中删除nodeN(removeDatanode,下面再讨论),并在Host2NodesMap中删除nodeN;
l          如果nodeS存在,表明前面已经注册过,则:
1.      更新网络拓扑(保存在NetworkTopology),首先在NetworkTopology中删除nodeS,然后跟新nodeS的相关信息,调用resolveNetworkLocation,获得nodeS的位置,并从新加到NetworkTopology里;
2.      更新心跳信息(register也是心跳);
l          如果nodeS不存在,表明这是一个新注册的DataNode,执行
1.      如果注册信息的storageID为空,表明这是一个全新的DataNode,分配storageID;
2.      创建DatanodeDescriptor,调用resolveNetworkLocation,获得位置信息;
3.      调用unprotectedAddDatanode(后面分析)添加节点;
4.      添加节点到NetworkTopology中;
5.      添加到心跳数组中。上面的过程,我们遗留了两个方法没分析,removeDatanode的流程如下:
l          更新系统的状态,包括capacityTotal,capacityUsed,capacityRemaining和totalLoad;
l          从心跳数组中删除节点,并标记节点isAlive属性为false;
l          从BlocksMap中删除这个节点上的所有block,用了(三零)分析到的removeStoredBlock方法;
l          调用unprotectedAddDatanode;
l          从NetworkTopology中删除节点信息。
unprotectedAddDatanode很简单,它只是更新了Host2NodesMap的信息。


----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


下面来看一个大家伙:
public DatanodeCommand sendHeartbeat(DatanodeRegistration nodeReg,
                                       long capacity,
                                       long dfsUsed,
                                       longremaining,
                                       int xmitsInProgress,
                                       int xceiverCount) throws IOException
DataNode发送到NameNode的心跳信息。细心的人会发现,请求的内容还是DatanodeRegistration,应答换成DatanodeCommand了。DatanodeCommand类图如下:
前面介绍DataNode时,已经分析过了DatanodeCommand支持的命令:
  DNA_TRANSFER:拷贝数据块到其他DataNode
  DNA_INVALIDATE:删除数据块
  DNA_SHUTDOWN:关闭DataNode
  DNA_REGISTERDataNode重新注册
  DNA_FINALIZE:提交升级
  DNA_RECOVERBLOCK:恢复数据块

10.PNG



有了上面这些基础,我们来看FSNamesystem.handleHeartbeat的处理过程:
l           调用getDatanode方法找对应的DatanodeDescriptor,保存于变量nodeinfo(可能为null)中,如果现有NameNode上记录的StorageID和请求的不一样,返回DatanodeCommand.REGISTER,让DataNode从新注册。
l           如果发现当前节点需要关闭(已经isDecommissioned),抛异常DisallowedDatanodeException。
l           nodeinfo是空或者现在状态不是活的,返回DatanodeCommand.REGISTER,让DataNode从新注册。
l           更新系统的状态,包括capacityTotal,capacityUsed,capacityRemaining和totalLoad;
l           接下来按顺序看有没有可能的恢复数据块/拷贝数据块到其他DataNode/删除数据块/升级命令(不讨论)。一次返回只能有一条命令,按上面优先顺序。
下面分析应答的命令是如何构造的。

首先是DNA_RECOVERBLOCK(恢复数据块),那是个非常长的流程,同时需要回去讨论DataNode上的一些功能,我们在后面介绍它。

对于DNA_TRANSFER(拷贝数据块到其他DataNode),从DatanodeDescriptor.replicateBlocks中取出尽可能多的项目,放到BlockCommand中。在DataNode中,命令由transferBlocks执行,前面我们已经分析过啦。

删除数据块DNA_INVALIDATE也很简单,从DatanodeDescriptor.invalidateBlocks中获取尽可能多的项目,放到BlockCommand中,DataNode中的动作,我们也分析过。

我们来讨论DNA_RECOVERBLOCK(恢复数据块),在讨论DataNode的过程中,我们没有讲这个命令是用来干什么的,还有它在DataNode上的处理流程,是好好分析分析这个流程的时候了。DNA_RECOVERBLOCK命令通过DatanodeDescriptor.getLeaseRecoveryCommand获取,获取过程很简单,将DatanodeDescriptor对象中队列recoverBlocks的所有内容取出,放入BlockCommand的Block中,设置BlockCommand为DNA_RECOVERBLOCK,就OK了。

关键是,这个队列里的信息是用来干什么的。我们先来看那些操作会向这个队列加东西,调用关系图如下:

11.PNG


租约有两个超时时间,一个被称为软超时(1分钟),另一个是硬超时(1小时)。如果租约软超时,那么就会触发internalReleaseLease方法,如下:
  voidinternalReleaseLease(Lease lease, String src) throws IOException
该方法执行:

l           检查src对应的INodeFile,如果不存在,不处于构造状态,返回;
l           文件处于构造状态,而文件目标DataNode为空,而且没有数据块,则finalize该文件(该过程在completeFileInternal中已经讨论过,租约在过程中被释放),并返回;
l           文件处于构造状态,而文件目标DataNode为空,数据块非空,则将最后一个数据块存放的DataNode目标取出(在BlocksMap中),然后设置为文件现在的目标DataNode;
l           调用INodeFileUnderConstruction.assignPrimaryDatanode,该过程会挑选一个目前还活着的DataNode,作为租约的主节点,并把<block,block目标DataNode数组>加到该DataNode的recoverBlocks队列中;
l           更新租约。
上面分析了租约软超时的情况下NameNode发生租约恢复的过程。DataNode上收到这个命令后,将会启动一个新的线程,该线程为每个Block调用recoverBlock方法:recoverBlock(blocks, false, targets, true)。
  private LocatedBlockrecoverBlock(Block block, booleankeepLength,
     DatanodeID[] datanodeids, booleancloseFile) throwsIOException
它的流程并不复杂,但是分支很多,如下图(蓝线是上面输入,没有异常走的流程):


12.PNG







首先是判断进来的Block是否在ongoingRecovery中,如果存在,返回,不存在,加到ongoingRecovery中。


接下来是个循环(框内部分是循环体,奇怪,没找到表示循环的符号),对每一个DataNode,获取Block的BlockMetaDataInfo(下面还会分析),这需要调用到DataNode间通信的接口上的方法getBlockMetaDataInfo。然后分情况看要不要把信息保存下来(图中间的几个判断),其中包括要进行同步的节点。
根据参数,更新数据块信息,然后调用syncBlock并返回syncBlock生产的LocatedBlock。
上面的这一圈,对于我们这个输入常数来说,就是把Block的长度,更新成为拥有最新时间戳的最小长度值,并得到要更新的节点列表,然后调用syncBlock更新各节点。
getBlockMetaDataInfo用于获取Block的BlockMetaDataInfo,包括Block的generationStamp,最后校验时间,同时它还会检查数据块文件的元信息,如果出错,会抛出异常。
syncBlock定义如下:
private LocatedBlock syncBlock(Block block, List<BlockRecord>syncList,
     booleancloseFile)
它的流程是:
l          如果syncList为空,通过commitBlockSynchronization向NameNode提交这次恢复;
l          syncList不为空,那么先NameNode申请一个新的Stamp,并根据上面得到的长度,构造一个新的数据块信息newblock;
l          对于没一个syncList中的DataNode,调用它们上面的updateBlock,更新信息;更新信息如果返回OK,记录下来;
l          如果更新了信息的DataNode不为空,调用commitBlockSynchronization提交这次恢复;并生成LocatedBlock;
l          如果更新的DataNode为空,抛异常。
通过syncBlock,所有需要恢复的DataNode上的Block信息都被更新。
DataNode上的updateBlock方法我们前面已经介绍了,就不再分析。
下面我们来看NameNode的commitBlockSynchronization方法,它在上面的过程中用于提交数据块恢复:
public voidcommitBlockSynchronization(Block block,
     longnewgenerationstamp, longnewlength,
     booleancloseFile, booleandeleteblock, DatanodeID[] newtargets
     )
参数分别是block,数据块;newgenerationstamp,新的时间戳;newlength,新长度;closeFile,是否关闭文件,deleteblock,是否删除文件;newtargets,新的目标列表。
上面的两次调用,输入参数分别是:
commitBlockSynchronization(block, 0, 0, closeFile, true,DatanodeID.EMPTY_ARRAY);
commitBlockSynchronization(block,newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false, nlist);
处理流程是:
l          参数检查;
l          获取对应的文件,记为pendingFile;
l          BlocksMap中删除老的信息;
l          如果deleteblock为true,从pendingFile删除Block记录;
l          否则,更新Block的信息;
l          如果不关闭文件,那么写日志保存更新,返回;
l          关闭文件的话,调用finalizeINodeFileUnderConstruction。
这块比较复杂,不仅涉及了NameNode和DataNode间的通信,而且还存在对于DataNode和DataNode间的通信(DataNode间的通信就只支持这两个方法,如下图)。后面介绍DFSClient的时候,我们还会再回来分析它的功能,以获取全面的理解。
13.PNG

下一篇


上一篇

欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条