分享

新手指导:编写mapreduce参考实例注释

admin 2014-4-10 21:14:54 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 16110



对于新手,经常会遇到代码中
不知道代码的到底做了什么?
,对于mapreduce新手亦是如此,下面给大家一个参考:







  1. package secondarySort;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.util.StringTokenizer;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.Text;
  11. import org.apache.hadoop.io.WritableComparable;
  12. import org.apache.hadoop.io.WritableComparator;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Partitioner;
  16. import org.apache.hadoop.mapreduce.Reducer;
  17. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  21. public class SecondarySort {
  22. //自己定义的key类应该实现WritableComparable接口
  23. public static class IntPair implements WritableComparable<IntPair> {
  24. int first;
  25. int second;
  26. /**
  27. * Set the left and right values.
  28. */
  29. public void set(int left, int right) {
  30. first = left;
  31. second = right;
  32. }
  33. public int getFirst() {
  34. return first;
  35. }
  36. public int getSecond() {
  37. return second;
  38. }
  39. @Override
  40. //反序列化,从流中的二进制转换成IntPair
  41. public void readFields(DataInput in) throws IOException {
  42. // TODO Auto-generated method stub
  43. first = in.readInt();
  44. second = in.readInt();
  45. }
  46. @Override
  47. //序列化,将IntPair转化成使用流传送的二进制
  48. public void write(DataOutput out) throws IOException {
  49. // TODO Auto-generated method stub
  50. out.writeInt(first);
  51. out.writeInt(second);
  52. }
  53. @Override
  54. //key的比较
  55. public int compareTo(IntPair o) {
  56. // TODO Auto-generated method stub
  57. if (first != o.first) {
  58. return first < o.first ? -1 : 1;
  59. } else if (second != o.second) {
  60. return second < o.second ? -1 : 1;
  61. } else {
  62. return 0;
  63. }
  64. }
  65. //新定义类应该重写的两个方法
  66. @Override
  67. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
  68. public int hashCode() {
  69. return first * 157 + second;
  70. }
  71. @Override
  72. public boolean equals(Object right) {
  73. if (right == null)
  74. return false;
  75. if (this == right)
  76. return true;
  77. if (right instanceof IntPair) {
  78. IntPair r = (IntPair) right;
  79. return r.first == first && r.second == second;
  80. } else {
  81. return false;
  82. }
  83. }
  84. }
  85. /**
  86. * 分区函数类。根据first确定Partition。
  87. */
  88. public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
  89. @Override
  90. public int getPartition(IntPair key, IntWritable value,
  91. int numPartitions) {
  92. return Math.abs(key.getFirst() * 127) % numPartitions;
  93. }
  94. }
  95. /**
  96. * 分组函数类。只要first相同就属于同一个组。
  97. */
  98. /*//第一种方法,实现接口RawComparator
  99. public static class GroupingComparator implements RawComparator<IntPair> {
  100. @Override
  101. public int compare(IntPair o1, IntPair o2) {
  102. int l = o1.getFirst();
  103. int r = o2.getFirst();
  104. return l == r ? 0 : (l < r ? -1 : 1);
  105. }
  106. @Override
  107. //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
  108. public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
  109. // TODO Auto-generated method stub
  110. return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
  111. b2, s2, Integer.SIZE/8);
  112. }
  113. }*/
  114. //第二种方法,继承WritableComparator
  115. public static class GroupingComparator extends WritableComparator {
  116. protected GroupingComparator() {
  117. super(IntPair.class, true);
  118. }
  119. @Override
  120. //Compare two WritableComparables.
  121. public int compare(WritableComparable w1, WritableComparable w2) {
  122. IntPair ip1 = (IntPair) w1;
  123. IntPair ip2 = (IntPair) w2;
  124. int l = ip1.getFirst();
  125. int r = ip2.getFirst();
  126. return l == r ? 0 : (l < r ? -1 : 1);
  127. }
  128. }
  129. // 自定义map
  130. public static class Map extends
  131. Mapper<LongWritable, Text, IntPair, IntWritable> {
  132. private final IntPair intkey = new IntPair();
  133. private final IntWritable intvalue = new IntWritable();
  134. public void map(LongWritable key, Text value, Context context)
  135. throws IOException, InterruptedException {
  136. String line = value.toString();
  137. StringTokenizer tokenizer = new StringTokenizer(line);
  138. int left = 0;
  139. int right = 0;
  140. if (tokenizer.hasMoreTokens()) {
  141. left = Integer.parseInt(tokenizer.nextToken());
  142. if (tokenizer.hasMoreTokens())
  143. right = Integer.parseInt(tokenizer.nextToken());
  144. intkey.set(left, right);
  145. intvalue.set(right);
  146. context.write(intkey, intvalue);
  147. }
  148. }
  149. }
  150. // 自定义reduce
  151. //
  152. public static class Reduce extends
  153. Reducer<IntPair, IntWritable, Text, IntWritable> {
  154. private final Text left = new Text();
  155. private static final Text SEPARATOR =
  156. new Text("------------------------------------------------");
  157. public void reduce(IntPair key, Iterable<IntWritable> values,
  158. Context context) throws IOException, InterruptedException {
  159. context.write(SEPARATOR, null);
  160. left.set(Integer.toString(key.getFirst()));
  161. for (IntWritable val : values) {
  162. context.write(left, val);
  163. }
  164. }
  165. }
  166. /**
  167. * @param args
  168. */
  169. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  170. // TODO Auto-generated method stub
  171. // 读取hadoop配置
  172. Configuration conf = new Configuration();
  173. // 实例化一道作业
  174. Job job = new Job(conf, "secondarysort");
  175. job.setJarByClass(SecondarySort.class);
  176. // Mapper类型
  177. job.setMapperClass(Map.class);
  178. // 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
  179. //job.setCombinerClass(Reduce.class);
  180. // Reducer类型
  181. job.setReducerClass(Reduce.class);
  182. // 分区函数
  183. job.setPartitionerClass(FirstPartitioner.class);
  184. // 分组函数
  185. job.setGroupingComparatorClass(GroupingComparator.class);
  186. // map 输出Key的类型
  187. job.setMapOutputKeyClass(IntPair.class);
  188. // map输出Value的类型
  189. job.setMapOutputValueClass(IntWritable.class);
  190. // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
  191. job.setOutputKeyClass(Text.class);
  192. // rduce输出Value的类型
  193. job.setOutputValueClass(IntWritable.class);
  194. // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
  195. job.setInputFormatClass(TextInputFormat.class);
  196. // 提供一个RecordWriter的实现,负责数据输出。
  197. job.setOutputFormatClass(TextOutputFormat.class);
  198. // 输入hdfs路径
  199. FileInputFormat.setInputPaths(job, new Path(args[0]));
  200. // 输出hdfs路径
  201. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  202. // 提交job
  203. System.exit(job.waitForCompletion(true) ? 0 : 1);
  204. }
  205. }
复制代码


说明:这个例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>

-----------------------------------------------------------------------------------------------------------------------如果想更深一步了解,可以看如下内容:

1 首先说一下工作原理:

在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable, Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

2  二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。

例如
输入文件
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
输出:(注意需要分割线)
------------------------------------------------
1       2
------------------------------------------------
3       4
------------------------------------------------
5       6
------------------------------------------------
7       8
7       82
------------------------------------------------
12      211
------------------------------------------------
20      21
20      53
20      522
------------------------------------------------
31      42
------------------------------------------------
40      511
------------------------------------------------
50      51
50      52
50      53
50      53
50      54
50      62
50      512
50      522
------------------------------------------------
60      51
60      52
60      53
60      56
60      56
60      57
60      57
60      61
------------------------------------------------
63      61
------------------------------------------------
70      54
70      55
70      56
70      57
70      58
70      58
------------------------------------------------
71      55
71      56
------------------------------------------------
73      57
------------------------------------------------
74      58
------------------------------------------------
203     21
------------------------------------------------
530     54
------------------------------------------------
730     54
------------------------------------------------
740     58

3  具体步骤:

(1)自定义key
在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法:


  1. //反序列化,从流中的二进制转换成IntPair
  2. public void readFields(DataInput in) throws IOException        
  3. //序列化,将IntPair转化成使用流传送的二进制
  4. public void write(DataOutput out)
  5. //key的比较
  6. public int compareTo(IntPair o)        
  7. //另外新定义的类应该重写的两个方法
  8. //The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
  9. public int hashCode()
  10. public boolean equals(Object right)
复制代码

(2)由于key是自定义的,所以还需要自定义一下类:

(2.1)分区函数类。这是key的第一次比较。

  1. public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>
复制代码

在job中使用setPartitionerClasss设置Partitioner。

(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator。

  1. public static class KeyComparator extends WritableComparator
复制代码
必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)另一种方法是 实现接口RawComparator。
在job中使用setSortComparatorClass设置key比较函数类。

(2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。


  1. public static class GroupingComparator extends WritableComparator
复制代码

分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2)
分组函数类的另一种方法是实现接口RawComparator。
在job中使用setGroupingComparatorClass设置分组函数类。

另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。


3 代码

上面实例例子中没有使用key比较函数类,而是使用key的实现的compareTo方法。





本帖被以下淘专辑推荐:

已有(6)人评论

跳转到指定楼层
hyp 发表于 2014-4-17 22:09:28
楼上深入浅出,非常不错,写的非常好,我是初学者,我仔细看了,基本上能理解了,不过也有点问题,呵呵,希望给予解答,问题如下:
问题1:在工作原理那里,楼上有一句话是这样的:“可以看到,这本身就是一个二次排序。”,这句话的意思是先进行分区,这里的分区本身可以看成对所有的<k,v>对进行了划分,这里可以看成一次排序,然后再对分区里面的<k,v>进行了排序,所有称为二次排序,不知道我的理解是否是对的?

问题2:楼上工作原理那里,说map阶段最后进行了分区,并且对分区进行了排序,然后在reducer阶段开始时又对分区进行了排序,map阶段最后已经对分区进行了排序,为什么这里又要对分区排序,这里不是重复了吗?

问题3:job.setCombinerClass(Reduce.class); 如果不使用这句,MR框架会自动为我们设置一个默认的Combiner类,并且在其中进行合并(默认是不合并中间结果)?,Combiner处理是发生在Map之后和reduce之前,并且其作用是对map的输出进行进一步处理,以便输出到reduce?
回复

使用道具 举报

sstutu 发表于 2014-4-17 22:46:49
1.首先第一个问题,我的理解,跟你的是一样的。
2.对于排序,这些都是由编程者来掌控的,如果你想排序,就可以排序,他们调用的函数都是一样的job.setSortComparatorClass。
3.Combiner,规约的目的是便于传输。

回复

使用道具 举报

hyp 发表于 2014-4-18 11:13:32
恩恩,还有一个问题想要请教,就是reduce那里,倘若有两个分区的数据进行处理(每个分区对于一定范围的key,该分区对应一个reduce),假设分区数据如下:
分区1:( 2, <3,5> )  ( 5, <2,3> )
分区2:( 3, <1,2> )   )
那么在reduce函数中当对某个分区进行处理时,假设对分区1进行处理,执行context.write(left, val);后,结果为:
( 2, 3 )
(2, 5  )
( 5, 2 )
( 5, 3 )
之后同时可能在另外一个机器上运行另外一个reducer对分区2进行处理,或在本机器完成对分区1的处理后,又对分区2进行reduce,执行结果如下
( 3, 1 )
( 3, 2 )
最终的输出结果为:
( 2, 3 )
(2, 5  )
( 3, 1 )
( 3, 2 )
( 5, 2 )
( 5, 3 )
我的问题来了,虽然单个分区处理后是有序的,那么为什么最终的输出结果是有序的? 即部分有序怎么保证整体的输出结果有序?难道它最后会对每个分区处理后的结果再次排序得到最终的结果?
回复

使用道具 举报

admin 发表于 2014-4-18 11:38:48
你可以参考下面:
在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

reduce也有一个排序的过程。

对于整个mapreduce编程来说:
举一个例子:
我们操作数据库,数据库的数据有自己的一个顺序。但是我们想让查询出来的数据,也就是输出来的数据,按照我们的意愿查询出来。比如排序,分组,升序,降序等等。
同理:mapreduce中也是这种情况。
只不过mapreduce里面有一些系统默认功能,比如分区,采用的是哈希算法,排序如果不自定义,采用的是默认排序。
但是这些我们都可以自定义。

回复

使用道具 举报

檬檬檬檬檬 发表于 2015-9-2 10:23:25
新手指导:编写mapreduce参考实例注释
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条