分享

MapReduce从HDFS读取文件的源码分析

howtodown 发表于 2014-11-15 18:15:56 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 7 23476
本帖最后由 howtodown 于 2014-11-15 18:17 编辑

问题导读
1.LineRecordReader的作用是什么?
2.DFSInputStream负责什么事情?




  • 以Map任务读取文本数据为例:

    1.png
    1) LineRecordReader负责对文件分割的定位,以及对读取每一行内容的封装供用户Map任务使用。每次在定位在文件中不为0的位置时,多读取一行,因为前一个处理该位置之前的数据时,会完整把该一行已经读取并处理。

    2.png
    2) LineReader负责对所要访问文件输入流的数据进行每一行读取,只实现对每一行读取的逻辑。
    3.png

    3) DFSDataInputStream封装了DFSInputStream的实现,直接调用DFSInputStream接口完成。
    4.png

    4) DFSInputStream,负责对所访问文件block的管理,提供定位block位置和提供block数据流读取的接口。
    定位到所读取的位置在哪个block中后,打开与该block的socket连接,然后从封装socket的RemoteBlockReader中读取数据:
    5.png

    根据定位到的block在哪个主机,打开与该主机的socket连接:
    6.png

    获取block信息后,开始获取数据流:
    7.png

    5) RemoteBlockReader,对打开socket获取DataNode输入流DataInputStream进行封装,开始读取文件数据,文件数据是DataNode按照每个chunk封装一次传输给客户端,客户端给每个chunk检查数据完整性。
    提供给上层读取socket中数据流接口:
    8.png

    开始从一个个chunk中读取数据:

    9.png

    每个chunk的读取:
    10.png

    6) Block传输读协议
    先是客户端打开连接后,向DataNode输入block的信息:
    11.png

    然后DataNode按照一个个chunk向DFSClient发送数据流供读,chunk的数据格式,按照数据读取先后顺序罗列:
    12.png





已有(7)人评论

跳转到指定楼层
gwgyk 发表于 2014-11-15 22:19:39
你好,我总结了下,麻烦你看看这样的调用关系是不是正确:
MapTaskrunNewMapper()中的“mapper.run(mapperContext);” --》MapContext.nextKeyValue()--》LineRecoredReader.nextKeyValue()--》LineReader.readLine()--》readDefaultLine()--》InputStream.read()接口--》DFSClient.read()-->read(byte buf[], int off, int len)

麻烦你帮我检查下,这样的调用关系是否正确。我主要是想知道,到底是哪儿调用了read(byte buf[], int off, int len)方法。谢谢啦
回复

使用道具 举报

howtodown 发表于 2014-11-15 23:52:10
gwgyk 发表于 2014-11-15 22:19
你好,我总结了下,麻烦你看看这样的调用关系是不是正确:
MapTaskrunNewMapper()中的“mapper.run(mapper ...

代码是相当复杂的,不要陷入太深,建议对hadoop有一定理解之后,在深入研究,否则会研究越来越糊涂,


LineRecordReader.next(LongWritable key, Text value)  
  LineReader.readLine(Text str, int maxLineLength, int maxBytesToConsume)  
    DataInputStream.read(byte b[])  /* DFSDataInputStream继承此方法 */  
      DFSInputStream.read(long position, byte[] buffer, int offset, int length)  
        DFSInputStream.fetchBlockByteRange(LocatedBlock block, long start,long end, byte[] buf, int offset)  
          BlockReader.readAll(byte[] buf, int offset, int len)  
            FSInputChecker.readFully(InputStream stm, byte[] buf, int offset, int len)  
              BlockReader.read(byte[] buf, int off, int len)  
                FSInputChecker.read(byte[] b, int off, int len)  
                  FSInputChecker.read1(byte b[], int off, int len)  
                    FSInputChecker.readChecksumChunk(byte b[], final int off, final int len)  
                      BlockReader.readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf)  
                        IOUtils.readFullyreadFully( InputStream in, byte buf[], int off, int len)  
                          DataInputStream.read(byte b[], int off, int len)  
                            BufferedInputStream.read(byte b[], int off, int len)  
                              BufferedInputStream.read1(byte[] b, int off, int len)  
                                org.apache.hadoop.net.SocketInputStream.read(byte[] b, int off, int len)  
                                  org.apache.hadoop.net.SocketInputStream.read(ByteBuffer dst)  
                                    org.apache.hadoop.net.SocketIOWithTimeout.doIO(ByteBuffer buf, int ops)  
                                      org.apache.hadoop.net.SocketInputStream.Reader.performIO(ByteBuffer buf)  
                                        sun.nio.ch.SocketChannelImpl.read(ByteBuffer buf)  
                                          sun.nio.ch.IOUtiil.read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd, Object lock)  
                                            sun.nio.ch.IOUtiil.readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd,Object lock)  
                                              sun.nio.ch.SocketDispatcher.read(FileDescriptor fd, long address, int len)  
                                                 sun.nio.ch.SocketDispatcher.read0(FileDescriptor fd, long address, int len) /* Native Method,根据不同的JDK实现不同 */  

回复

使用道具 举报

howtodown 发表于 2014-11-15 23:55:37


下面也是相关内容可以参考:

我们可以开始从系统的外部来了解HDFS了,DFSClient提供了连接到HDFS系统并执行文件操作的基本功能。DFSClient也是个大家伙,我们先分析它的一些内部类。我们先看LeaseChecker。租约是客户端对文件写操作时需要获取的一个凭证,前面分析NameNode时,已经了解了租约,INodeFileUnderConstruction的关系,INodeFileUnderConstruction只有在文件写的时候存在。客户端的租约管理很简单,包括了增加的put和删除的remove方法,run方法会定期执行,并通过ClientProtocl的renewLease,自动延长租约。

接下来我们来分析内部为文件读引入的类。

1.jpg



InputStream是系统的虚类,提供了3个read方法,一个skip(跳过数据)方法,一个available方法(目前流中可读的字节数),一个close方法和几个在输入流中做标记的方法(mark:标记,reset:回到标记点和markSupported:能力查询)。
FSInputStream也是一个虚类,它将接口Seekable和PositionedReadable混插到类中。Seekable提供了可以在流中定位的能力(seek,getPos和seekToNewSource),而PositionedReadable提高了从某个位置开始读的方法(一个read方法和两个readFully方法)。
FSInputChecker在FSInputStream的基础上,加入了HDFS中需要的校验功能。校验在readChecksumChunk中实现,并在内部的read1方法中调用。所有的read调用,最终都是使用read1读数据并做校验。如果校验出错,抛出异常ChecksumException。
有了支持校验功能的输入流,就可以开始构建基于Block的输入流了。我们先回顾前面提到的读数据块的请求协议:


2.jpg


然后我们来分析一下创建BlockReader需要的参数,newBlockReader最复杂的请求如下:
   public static BlockReader newBlockReader( Socket sock, String file,
                                       long blockId,
                                       long genStamp,
                                       long startOffset, long len,
                                       int bufferSize, boolean verifyChecksum,
                                       String clientName)
                                       throws IOException
其中,sock为到DataNode的socket连接,file是文件名(只是用于日志输出),其它的参数含义都很清楚,和协议基本是一一对应的。该方法会和DataNode进行对话,发送上面的读数据块的请求,处理应答并构造BlockReader对象(BlockReader的构造函数基本上只有赋值操作)。
BlockReader的readChunk用于处理DataNode送过来的数据,格式前面我们已经讨论过了,如下图。


3.jpg


读数据用的read,会调用父类FSInputChecker的read,最后调用readChunk,如下:

4.jpg


read如果发现读到正确的校验码,则用过checksumOk方法,向DataNode发送成功应达。
BlockReader的主要流程就介绍完了,接下来分析DFSInputStream,它封装了DFSClient读文件内容的功能。在它的内部,不但要处理和NameNode的通信,同时通过BlockReader,处理和DataNode的交互。

DFSInputStream记录Block的成员变量是:
    private LocatedBlocks locatedBlocks = null;
它不但保持了文件对应的Block序列,还保持了管理Block的DataNode的信息,是DFSInputStream中最重要的成员变量。DFSInputStream的构造函数,通过类内部的openInfo方法,获取这个变量的值。openInfo间接调用了NameNode的getBlockLocations,获取LocatedBlocks。
DFSInputStream中处理数据块位置的还有下面一些函数:


    synchronized List<LocatedBlock> getAllBlocks() throws IOException
    private LocatedBlock getBlockAt(long offset) throws IOException
    private synchronized List<LocatedBlock> getBlockRange(long offset,
                                                          long length)
    private synchronized DatanodeInfo blockSeekTo(long target) throws IOException

它们的功能都很清楚,需要注意的是他们处理过程中可能会调用再次调用NameNode的getBlockLocations,使得流程比较复杂。blockSeekTo还会创建对应的BlockReader对象,它被几个重要的方法调用(如下图)。在打开到DataNode之前,blockSeekTo会调用chooseDataNode,选择一个现在活着的DataNode。

5.jpg


通过上面的分析,我们已经知道了在什么时候会连接NameNode,什么时候会打开到DataNode的连接。下面我们来看读数据。read方法定义如下:
    public int read(long position, byte[] buffer, int offset, int length)
该方法会从流的position位置开始,读取最多length个byte到buffer中offset开始的空间中。参数检测完以后,通过getBlockRange获取要读取的数据块对应的block范围,然后,利用fetchBlockByteRange方法,读取需要的数据。
fetchBlockByteRange从某一个数据块中读取一段数据,定义如下:

    private void fetchBlockByteRange(LocatedBlock block, long start,
                                     long end, byte[] buf, int offset)

由于读取的内容都在一个数据块内部,这个方法会创建BlockReader,然后利用BlockReader的readAll方法,读取数据。读的过程中如果发生校验错,那么,还会通过reportBadBlocks,向NameNode报告校验错。
另一个读方法是:
    public synchronized int read(byte buf[], int off, int len) throws IOException
它在流的当前位置(可以通过seek方法调整)读取数据。首先它会判断当前流的位置,如果已经越过了对象现在的blockReader能读取的范围(当上次read读到数据块的尾部时,会发生这中情况),那么通过blockSeekTo打开到下一个数据块的blockReader。然后,read在当前的这个数据块中通过readBuffer读数据。主要,这个read方法只在一块数据块中读取数据,就是说,如果还有空间可以存放数据但已经到了数据块的尾部,它不会打开到下一个数据块的BlockReader继续读,而是返回,返回值包含了以读取数据的长度。



回复

使用道具 举报

gwgyk 发表于 2014-11-16 17:54:15
你好,我想问下,第一行的LineRecordReader.next(LongWritable key, Text value)  是MapTaskrunNewMapper()中的“mapper.run(mapperContext);” --》MapContext.nextKeyValue()--》LineRecoredReader.nextKeyValue()这样调用过来的吗?
不过next()方法名可能变成nextKeyValue()了
回复

使用道具 举报

howtodown 发表于 2014-11-17 09:27:53
gwgyk 发表于 2014-11-16 17:54
你好,我想问下,第一行的LineRecordReader.next(LongWritable key, Text value)  是MapTaskrunNewMapper() ...
对的,版本有所不同
回复

使用道具 举报

gwgyk 发表于 2014-11-17 15:14:26
本帖最后由 gwgyk 于 2014-11-17 15:30 编辑
howtodown 发表于 2014-11-15 23:52
代码是相当复杂的,不要陷入太深,建议对hadoop有一定理解之后,在深入研究,否则会研究越来越糊涂,

...

不好意思,因为可能是因为版本问题,你列出来的方法调用层次,我自己对不上。我用的是hadoop1.2.1版本,所以能不能麻烦你帮我看看我下面说的这个调用层次是不是正确?
(MapTask) runNewMapper()  --> mapper.run(mapperContext)
    (Mapper) run(Context context)  --> context.nextKeyValue()
        (MapContext) nextKeyValue()  -->  reader.nextKeyValue()
             (LineRecordReader) nextKeyValue() --> in.readLine();
                 (LineRecordReader) readLine) --> readDefaultLine()
                     (LineRecordReader) readDefaultLine()  --> in.read(buffer)
                         (InputStream) read(byte b[]) --> read(b, 0, b.length)
                            (InputStream) read(byte b[], int off, int len) --> read()
                                 (InputStream) read() -->(子类实现) DFSClient.DFSInputStream.read() --> read( oneByteBuf, 0, 1 )
                                    (DFSClient.DFSInputStream)read(byte buf[], int off, int len)  --> blockSeekTo()、readBuffer()等方法

关键就是第9行中,(InputStream) read()方法最后是不是由DFSClient.DFSInputStream.read() 来实现的?如果是这样的话,我就能明白read()是怎样被调用的了。
不好意思啊,太麻烦你了。非常感谢啦



回复

使用道具 举报

eclipsesky 发表于 2014-11-18 15:55:26
版主 你小时候长得好丑 哈哈
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条