分享

Hadoop源代码分析:包mapreduce.lib.input及包mapreduce.lib.map(6)

xng2012 发表于 2014-1-16 16:40:48 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 0 8372
本帖最后由 pig2 于 2014-1-16 17:17 编辑

一、包mapreduce.lib.input

接下来我们按照MapReduce过程中数据流动的顺序,来分解org.apache.hadoop.mapreduce.lib.*的相关内容,并介绍对应的基类的功能。首先是input部分,它实现了MapReduce的数据输入部分。类图如下:

24.PNG


类图的右上角是InputFormat,它描述了一个MapReduceJob的输入,通过InputFormat,Hadoop可以:

  •            检查MapReduce输入数据的正确性;
  •            将输入数据切分为逻辑块InputSplit,这些块会分配给Mapper;
  •            提供一个RecordReader实现,Mapper用该实现从InputSplit中读取输入的<K,V>对。
在org.apache.hadoop.mapreduce.lib.input中,Hadoop为所有基于文件的InputFormat提供了一个虚基类FileInputFormat。下面几个参数可以用于配置FileInputFormat:
  •            mapred.input.pathFilter.class:输入文件过滤器,通过过滤器的文件才会加入InputFormat;
  •            mapred.min.split.size:最小的划分大小;
  •            mapred.max.split.size:最大的划分大小;
  •            mapred.input.dir:输入路径,用逗号做分割。

类中比较重要的方法有:
  protected List<FileStatus> listStatus(Configuration job)
递归获取输入数据目录中的所有文件(包括文件信息),输入的job是系统运行的配置Configuration,包含了上面我们提到的参数。


  public List<InputSplit> getSplits(JobContext context)


将输入划分为InputSplit,包含两个循环,第一个循环处理所有的文件,对于每一个文件,根据输入的划分最大/最小值,循环得到文件上的划分。注意,划分不会跨越文件。


FileInputFormat没有实现InputFormat的createRecordReader方法。
FileInputFormat有两个子类,SequenceFileInputFormat是Hadoop定义的一种二进制形式存放的键/值文件(参考http://hadoop.apache.org/core/do ... o/SequenceFile.html),它有自己定义的文件布局。由于它有特殊的扩展名,所以SequenceFileInputFormat重载了listStatus,同时,它实现了createRecordReader,返回一个SequenceFileRecordReader对象。TextInputFormat处理的是文本文件,createRecordReader返回的是LineRecordReader的实例。这两个类都没有重载FileInputFormat的getSplits方法,那么,在他们对于的RecordReader中,必须考虑FileInputFormat对输入的划分方式。


FileInputFormat的getSplits,返回的是FileSplit。这是一个很简单的类,包含的属性(文件名,起始偏移量,划分的长度和可能的目标机器)已经足以说明这个类的功能。
RecordReader用于在划分中读取<Key,Value>对。RecordReader有五个虚方法,分别是:


  •            initialize:初始化,输入参数包括该Reader工作的数据划分InputSplit和Job的上下文context;
  •            nextKey:得到输入的下一个Key,如果数据划分已经没有新的记录,返回空;
  •            nextValue:得到Key对应的Value,必须在调用nextKey后调用;
  •            getProgress:得到现在的进度;
  •            close,来自java.io的Closeable接口,用于清理RecordReader。



我们以LineRecordReader为例,来分析RecordReader的构成。前面我们已经分析过FileInputFormat对文件的划分了,划分完的Split包括了文件名,起始偏移量,划分的长度。由于文件是文本文件,LineRecordReader的初始化方法initialize会创建一个基于行的读取对象LineReader(定义在org.apache.hadoop.util中,我们就不分析啦),然后跳过输入的最开始的部分(只在Split的起始偏移量不为0的情况下进行,这时最开始的部分可能是上一个Split的最后一行的一部分)。nextKey的处理很简单,它使用当前的偏移量作为Key,nextValue当然就是偏移量开始的那一行了(如果行很长,可能出现截断)。进度getProgress和close都很简单。



---------------------------------------------------------------------------------------------------------------------------------------------------

二、包mapreduce.lib.map

Hadoop的MapReduce框架中,Map动作通过Mapper类来抽象。一般来说,我们会实现自己特殊的Mapper,并注册到系统中,执行时,我们的Mapper会被MapReduce框架调用。Mapper类很简单,包括一个内部类和四个方法,静态结构图如下:

25.PNG


内部类Context继承自MapContext,并没有引入任何新的方法。


Mapper的四个方法是setup,map,cleanup和run。其中,setup和cleanup用于管理Mapper生命周期中的资源,setup在完成Mapper构造,即将开始执行map动作前调用,cleanup则在所有的map动作完成后被调用。方法map用于对一次输入的key/value对进行map动作。run方法执行了上面描述的过程,它调用setup,让后迭代所有的key/value对,进行map,最后调用cleanup。


org.apache.hadoop.mapreduce.lib.map中实现了Mapper的三个子类,分别是InverseMapper(将输入<key, value> map为输出<value, key>),MultithreadedMapper(多线程执行map方法)和TokenCounterMapper(对输入的value分解为token并计数)。其中最复杂的是MultithreadedMapper,我们就以它为例,来分析Mapper的实现。


MultithreadedMapper会启动多个线程执行另一个Mapper的map方法,它会启动mapred.map.multithreadedrunner.threads(配置项)个线程执行Mapper:mapred.map.multithreadedrunner.class(配置项)。MultithreadedMapper重写了基类Mapper的run方法,启动N个线程(对应的类为MapRunner)执行mapred.map.multithreadedrunner.class(我们称为目标Mapper)的run方法(就是说,目标Mapper的setup和cleanup会被执行多次)。目标Mapper共享同一份InputSplit,这就意味着,对InputSplit的数据读必须线程安全。为此,MultithreadedMapper引入了内部类SubMapRecordReader,SubMapRecordWriter,SubMapStatusReporter,分别继承自RecordReader,RecordWriter和StatusReporter,它们通过互斥访问MultithreadedMapper的Mapper.Context,实现了对同一份InputSplit的线程安全访问,为Mapper提供所需的Context。这些类的实现方法都很简单。


上一篇
Hadoop源代码分析(*IDs类和*Context类)及包hadoop.mapred中的MapReduce接口(5)

下一篇
Hadoop源代码分析:包org.apache.hadoop.mapreduce(7)




欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条