分享

MapReduce 编程模型概述

本帖最后由 howtodown 于 2014-3-2 21:29 编辑

已经讲了一篇mapreduce编程模型,这里从另外一角度,感觉也不错。引用一个经典的图片来说明问题.
可以带着下面问题来阅读:
mapreduce的过程都包含什么操作?
map处理完后,tasktracer会完成什么任务?

ruducer的作用是什么?
map中经过谁的处理之后,变为reduce输入?



1. 首先, 我们能确定我们有一份输入, 而且他的数据量会很大

2. 通过split之后, 他变成了若干的分片, 每个分片交给一个Map处理

3. map处理完后, tasktracker会把数据进行复制和排序, 然后通过输出的key 和value进行 partition的划分, 并把partition相同的map输出, 合并为相同的reduce的输入.

4. ruducer通过处理, 把数据输出, 每个相同的key, 一定在一个reduce中处理完, 每一个reduce至少对应一份输出(可以通过扩展MultipleOutputFormat来得到多分输出)

5. 来看一个例子, 如下图:(来自 《hadoop权威指南》 一书)


图片2.png

  说明几点:

  5.1 输入的数据可能就是一堆文本

  5.2 mapper会解析每行数据, 然后提取有效的数据, 作为输出. 这里的例子是 从日志文件中提取每一年每天的气温, 最后会计算每年的最高气温

  5.3 map的输出就是一条一条的 key-value

  5.4 通过shuffle之后, 变成reduce的输入, 这是相同的key对应的value被组合成了一个迭代器

  5.5 reduce的任务是提取每一年的最高气温, 然后输出

二. Mapper
1. mapper可以选择性地继承 MapreduceBase这个基类, 他只是把一些方法实现了而已, 即使方法体是空的.

2. mapper必须实现 Mapper 接口(0.20以前的版本), 这是一个泛型接口, 需要执行输入和输出的key-value的类型, 这些类型通常都是Wriable接口的实现类

3. 实现map方法, 方法有四个参数, 前面两个就是输入的 Key 和 value, 第三个参数是 OuputCollector, 用于收集输出的, 第四个是reporter,用来报告一些状态的,可以用于debug

  3.1 input 默认是一行一条记录, 每天记录都放在value里边

  3.2 output  每次搜集一条 K-V记录, 一个K可以对应多个value, 在reduce 里面体现为一个 iterator

4. 覆盖 configure方法可以得到JobConf的实例, 这个JobConf是在Job运行时传递过来的, 可以跟外部资源进行数据交互

三. Reducer

1. reduce也可以选择继承 MapreduceBase这个基类, 功能跟mapper一样.

2. reducer必须实现Reducer接口, 这个接口同样是泛型接口, 意义跟Mapper的类似

3. 实现reduce方法, 这个方法也有四个参数, 第一个是输入的key, 第二个是输入的 value的迭代器, 可以遍历所有的value,相当于一个列表,
outputCollector跟map的一样, 是输出的搜集器, 每次搜集都是key-value的形式, report的作用跟map的相同.

4. 在新版本中, hadoop已经将后面两个参数合并到一个context对象里边了, 当然还会兼容就版本的 接口. >0.19.x

5. 覆盖configure方法, 作用跟map的相同

6. 覆盖close 方法,可以做一些reduce结束后的处理工作.(clean up)

四. Combiner

1. combiner的作用是, 将map的输出,先计算一遍,得到初步的合并结果, 减少reduce的计算压力.

2. combiner的编写方法跟reduce是一样的, 他本来就是一个Reducer的实现类

3. 当reducer符合函数  F(a,b) = F(F(a), F(b)) 时, combinner可以与reduce相同. 比如 sum(a,b,c,d,e,f,g) = sum(sum(a,b) ,sum(c,d,e,f) , sum(g)) 还有max, min等等.

4. 编写正确的combiner可以优化整个mapreduce程序的性能.(特别是当reduce是性能瓶颈的时候.)

5. combiner可以跟reducer不同.


五. Configuration

1. 后加的属性的值会覆盖前面定义的相同名称的属性的值.

2. 被定义为 final的属性(在属性定义中加上<final>true</final>标签)不会被后面的同名属性定义的值给覆盖.

3. 系统属性比通过资源定义的属性优先级高, 也就是通过System.setProperty()方法会覆盖在资源文件中定义的属性的值.

4. 系统属性定义必须在资源文件中有相应的定义才会生效.

5. 通过 -D 选项定义的属性, 比在资源文件中定义的属性优先级要高.

六. Run Jobs

1. 设置 inputs & output

    1.1 先判断输入是否存在 (不存在会导致出错,最好利用程序来判断.)
    1.2 判断输出是否已经存在(存在也会导致出错)
    1.3 养成一种好的习惯(先判断,再执行)

2. 设置 mapper、reducer、combiner. 各个实现类的class对象.  XXXX.class

3. 设置 inputformat & outputformat & types

    3.1 input和output format都有两种, 一种是 textfile, 一种是sequencefile. 简单理解, textfile是文本组织的形式,sequence file是 二进制组织的形式.

    3.2 Types的设置, 根据输入和输出的数据类型, 设置各种Writable接口的实现类的class对象.

4. 设置reduce count

    4.1 reduce count可以为0, 当你的数据无需reduce的时候.

    4.2 reduce数量最好稍微少于当前可用的slots的数量, 这样reduce就能在一波计算中算好. (一个slot可以理解为一个计算单元(资源).)

七. 其他的一些细节.

1. ChainMapper可以实现链式执行mapper 他本身就是一个Mapper的实现类. 提供了一个addMapper的方法.

2. ChainReducer 跟ChainMapper类似, 可以实现链式执行reducer, 他是Reducer的实现类.

3. 多个job先后运行, 可以通过先后执行 JobClient.runJob方法来实现先后顺序

4. 扩展MultipleOutputFormat接口, 可以实现一个reduce对应多份输出 (而且可以指定文件名哦)

5. Partitioner 接口用于将 Map的输出结果进行分区, 分区相同的key对应的数据会被同一个reducer处理

    5.1 提供了一个接口方法: public int getPartition(K2 key, V2 value, int numReduceTasks)

    5.2 可以自己定义, 根据key的某些特指来划分, 也可以根据value的某些特质来划分.

    5.3 numReduceTasks就是设置的reduce的个数.一般返回的partition的值应该都小于这个值.(%)

6. reporter的作用

    6.1 reporter.incrCounter(key, amount). 比如对数据计算是, 一些不合规范的脏数据, 我们可以通过counter来记录有多少

    6.2 reporter.setStatus(status); 方法可以设置一条状态消息, 当我们发现job运行出现这条消息是, 说明出现了我们预期的(错误或者正确)的情况, 用于debug.

    6.3 reporter.progress(), 像mapreduce框架报告当前运行进度. 这个progress可以起到心跳的作用. 一个task要是超过10分钟没有想mapreduce框架报告情况, 这个reduce会被kill掉. 当你的任务处理会比较旧是, 最好定时向mapreduce汇报你的状态.

7. 通过实现Wriable接口, 我们可以自定义key和value的类型, 使用起来就像pojo, 不需要每次都进行parse. 如果你的自定义类型是Key的类型, 则需要同时实现Comparable 接口, 用于排序. 比如MapWritable就是一个例子.





本帖被以下淘专辑推荐:

已有(26)人评论

跳转到指定楼层
pig2 发表于 2014-3-2 21:32:23
个人认为,仅供参考:


mapreduce的过程都包含什么操作?
排序复制合并

map处理完后,tasktracer会完成什么任务?
tasktracker会把数据进行复制和排序, 然后通过输出的key 和value进行 partition的划分, 并把partition相同的map输出, 合并为相同的reduce的输入.

ruducer的作用是什么?
把数据输出, 每个相同的key, 一定在一个reduce中处理完, 每一个reduce至少对应一份输出

map中经过谁的处理之后,变为reduce输入?
通过shuffle之后, 变成reduce的输入, 这是相同的key对应的value被组合成了一个迭代器

回复

使用道具 举报

ascentzhen 发表于 2014-7-18 13:56:28
在此基础上看源码会有比较大的进步
回复

使用道具 举报

yizuoming 发表于 2014-9-3 21:25:01
谢谢分享,很详细,学到很多
回复

使用道具 举报

kanaka10 发表于 2014-9-29 14:34:02
不错 学习了
回复

使用道具 举报

szcountryboy 发表于 2014-10-29 11:10:09
一个文件如果分片的话,是否会造成数据混乱? 假如一个日志文件,一条数据由多行组成(头、数据1-N行、尾,用户开始删除标记、删除明细内容,结束),如果这样在split后,一条完整的数据是否会被分到两个名多相map里面?如果分到多个map里面,如何保证数据的完整性?帮忙解惑下,谢谢。
回复

使用道具 举报

admin 发表于 2014-10-29 11:30:20
szcountryboy 发表于 2014-10-29 11:10
一个文件如果分片的话,是否会造成数据混乱? 假如一个日志文件,一条数据由多行组成(头、数据1-N行、尾, ...
参考这个帖子Hadoop MapReduce中如何处理行Block和inputSplit
回复

使用道具 举报

szcountryboy 发表于 2014-10-29 11:41:50
非常感谢回复,那个贴子只是说一行内容超长被split,然后按照换行符来处理。
我说的是正常截断,假如日志
delete_start
detail1
detail2
detailn
delete_end
日志内容都是一行一行存储,每行结尾都有换行符。我们现在忽略行内容截断情况,如果split到detail2这里,detail2前放到split1,detail2放到split2
这样在做map数据过滤的时候,数据是否会乱掉?
回复

使用道具 举报

bioger_hit 发表于 2014-10-29 14:27:03
szcountryboy 发表于 2014-10-29 11:41
非常感谢回复,那个贴子只是说一行内容超长被split,然后按照换行符来处理。
我说的是正常截断,假如日志
...
一个输入文件或则一个mapreduce只有一个split,不会产生第二个,即使是两个也是对整体数据的分割,不会产生你说的这个情况
回复

使用道具 举报

szcountryboy 发表于 2014-10-29 14:48:04
本帖最后由 szcountryboy 于 2014-10-29 14:51 编辑

日志内容都是一行一行存储,每行结尾都有换行符。我们现在忽略行内容截断情况,如果split到detail2这里,detail2前放到split1,detail2放到split2
这样在做map数据过滤的时候,数据是否会乱掉?

split会将input file根据大小分割给多个map来处理,这点应该不会错吧。

这里说的split1,split2 指的是分割后处理数据的map
delete_start
detail1        ---------------------------->分割到map1
detail2
detailn       ----------------------------->分割到map2
delete_end------------------------------>分割到map3

如上面的日志数据,如何保证从start到end,100%会分到一个map里,也就是不会拆散数据。
回复

使用道具 举报

123下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条