分享

HBase MapReduce 二次排序Secondary Sort

1.MapReduce不适用那些数据?
2.MapReduce如何降低实现的复杂度?
3.Map端和Reduce端是否都有Shuffle过程?

欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(2)人评论

跳转到指定楼层
xng2012 发表于 2014-4-10 16:50:06
本帖最后由 xng2012 于 2014-4-10 16:54 编辑
MapReduce是Hadoop中处理大数据的方法,是一个处理大数据的简单算法、编程泛型。虽然思想简单,但其实真正用起来还是有很多问题,不是所有的问题都可以像WordCount那样典型和直观, 有很多需要trick的地方。MapReduce的中心思想是分而治之,数据要松耦合,可以划分为小数据集并行处理,如果数据本身在计算上存在很强的依赖关系,就不要赶鸭子上架,用MapReduce了。

MapReduce编程中,最重要的是要抓住Map和Reduce的input和output,好的input和output可以降低实现的复杂度。最近,写了很多关于MapReduce的job,有倒排索引,统计,排序等。其中,对排序花费了一番功夫,MapReduce做WordCount很好理解,
Map input: , output: [word, 1],
Reduce input: [word, 1], output: [word, totalcount],还可以设置Combiner进行优化。

但排序不同了,大量的文件记录,分配给map,然后reduce出来的文件分布在各个机器上,怎么保证有序呢?排序是算法中常考常用的,MapReduce做排序还需要理解一下MapReduce过程中,非常magic的过程Shuffle and Sort.

Shuffle and Sort过程解析
1416uzt.jpg
如上图,Shuffle的过程包括了Map端和Reduce端。

Map端
j0ftw0.jpg

  • Input Split分配给Map
  • Map进行计算,输出[key, value]形式的output
  • Map的输出结果缓存在内存里
  • 内存中进行Partition,默认是HashPartitioner(采用取模hash (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks), 目的是将map的结果分给不同的reducer,有几个Partition,就有几个reducer,partition的数目可以在job启动时通过参数 “-Dmapreduc.job.reduces”设置(Hadoop 2.2.0), HashPartitioner可以让map的结果均匀的分给不同机器上的reducer,保证负载均衡。
  • 内存中在Partition结束后,会对每个Partition中的结果按照key进行排序。
  • 排序结束后,相同的key在一起了,如果设置过combiner,就合并数据,减少写入磁盘的记录数
  • 当内存中buffer(default 100M)达到阈值(default 80%),就会把记录spill(即溢写)到磁盘中,优化Map时可以调大buffer的阈值,缓存更多的数据。
  • 当磁盘中的spill文件数目在3(min.num.spills.for.combine)个(包括)以上, map的新的output会再次运行combiner,而如果磁盘中spill file文件就1~2个,就没有必要调用combiner,因为combiner大多数情况和reducer是一样的逻辑,可以在reduer端再计算。
  • Map结束时会把spill出来的多个文件合并成一个,merge过程最多10(默认)个文件同时merge成一个文件,多余的文件分多次merge,merge过程是merge sort的算法。
  • Map端shuffle完毕,数据都有序的存放在磁盘里,等待reducer来拿取

Reducer端
shuffle and sort的过程不仅仅在map端,别忘了reducer端还没拿数据呢,reduce job当然不能开启。
  • Copy phase: reducer的后台进程(default 5个)到被Application Master (Hadoop 2.2), 或者之前的JobTracker 指定的机器上将map的output拷贝到本地,先拷贝到内存,内存满了就拷贝的磁盘。
  • Sort phase(Merge phase): Reduer采用merge sort,将来自各个map的data进行merge, merge成有序的更大的文件。
  • 如果设置过Combiner,merge过程可能会调用Combiner,调不调用要看在磁盘中产生的文件数目是否超过了设定的阈值。(这一点我还没有确认,但Combiner在Reducer端是可能调用。)
  • Reduce phase: reduce job开始,输入是shuffle sort过程merge产生的文件。


MapReduce排序(Secondary Sort)案例

MapReduce排序(Secondary Sort)案例理解了Shuffle的过程后,我们可以着手开始做排序了。

1. 需求

现在我在HBase里面,存储了10万条文献的评论统计记录,每个文献的评论数目是2~20,每个文献评论统计记录在HBase的存储示例如下:
即表 “pb_stat_comments_count”的记录
[rowkey, column family, column qualifer, value] =
[literature row key, 'info', 'count', 12]
[literature row key, 'info', 'avgts', 1400815843854]
count:是每个文献的记录
avgts:是所有文献的平均评价时间戳
排序要求:将文献按照评价次数逆序排,次数大的靠前,次数相同的平均评价时间相同的靠前。


2. 思想

MapReduce在Map端排序时,都只按key排序,而现在我们的排序指标有两个字段count, avgts,组合的key,为了保证排序结果对于第二个字段有序,MapReduce里面叫做Secondary Sort,就是个名称而已,可以扩展为保证第三个字段有序,第四,第五…
  • 首先,我们需要重新自定义一个组合Key,即我们将看到的示例中的SortKeyPair类, 包括count和avgts两个字段,并overwrite里面的compare to方法,按我们的需求,count大的靠前,count相同,avgts大的靠前。
  • 其次,利用SortKeyPair,自定义一个SortComparatorClass给Map端排序时用。
  • 再次,自定义PartitionClass.
    我们单机伪分布式下,默认只有一个Partition,一个Reducer,Partition有无均可,但是分布式环境下,我们要求分布在各个reducer上的结果有序,即评论次数2~5的在reducer0上,6~10的在reducer 1上,11~15在reducer2上,16~20在reducer3上,如果采用取模hash,会造成各个reducer的结果顺序无法控制,因此我们不能用HashPartitioner。而自己的Partitioner则需要只按count进行partition,不管平均评价时间,我们这里把partition的主要的这个key叫做NaturalKey.
  • 最后,我们可以看到reduce端要进行merge,merge过程中,我们需要把相同的count分做一组,需要自定义GroupingComparatorClass。
    否则如果是按整个组合key进行分组,每个组合key都是不同的,不能分到一个reduce调用中,reduce方法会被调用10万次(单机环境,一个reducer拿到所有10万条文献), 而如果按count分组,文献评论数目2~20,reduce方法只会被调用最多19次,减少了开销。
    我们可以看到一个Reducer实例中,reduce方法会被调用多次,按道理是调用一次,但是因为Reducer过程会把数据分成多个组,reduce调用多次是可能的。(具体的需要再看看源码)

案例代码

1. 自定义组合key类:SortKeyPair.java
  1. <font color="#000000">public class SortKeyPair implements WritableComparable<SortKeyPair> {
  2.     private int count = 0;
  3.     private long avgts = 0;
  4.         //要写一个默认构造函数,否则MapReduce的反射机制,无法创建该类报错
  5.     public SortKeyPair() {
  6.     }
  7.     /**
  8.      *
  9.      * @param count
  10.      * @param timestamp
  11.      *            Average timestamp
  12.      */
  13.     public SortKeyPair(int count, long avgts) {
  14.         super();
  15.         this.count = count;
  16.         this.avgts = avgts;
  17.     }
  18.     public int getCount() {
  19.         return count;
  20.     }
  21.     public void setCount(int count) {
  22.         this.count = count;
  23.     }
  24.     public long getAvgts() {
  25.         return avgts;
  26.     }
  27.     public void setAvgts(long avgts) {
  28.         this.avgts = avgts;
  29.     }
  30.     @Override
  31.     public void write(DataOutput out) throws IOException {
  32.         out.writeInt(this.count);
  33.         out.writeLong(this.avgts);
  34.     }
  35.     @Override
  36.     public void readFields(DataInput in) throws IOException {
  37.         this.count = in.readInt();
  38.         this.avgts = in.readLong();
  39.     }
  40.     /**
  41.      * We want sort in descending count and descending avgts, Java里面排序默认小的放前面,即返回-1的放前面,这里直接把小值返回1,就会被排序到后面了。
  42.      */
  43.     @Override
  44.     public int compareTo(SortKeyPair o) {
  45.         int res = this.count < o.getCount() ? 1
  46.                 : (this.count == o.getCount() ? 0 : -1);
  47.         if (res == 0) {
  48.             res = this.avgts < o.getAvgts() ? 1
  49.                     : (this.avgts == o.getAvgts() ? 0 : -1);
  50.         }
  51.         return res;
  52.     }
  53.         //这个方法需要Overrride
  54.     @Override
  55.     public int hashCode() {
  56.         return Integer.MAX_VALUE - this.count;
  57.     }
  58.     @Override
  59.     public String toString() {
  60.         return this.count + "," + this.avgts;
  61.     }
  62.         // 这个方法,写不写都不会影响的,至少我测的是这样
  63.     @Override
  64.     public boolean equals(Object obj) {
  65.         if (obj == null) {
  66.             return false;
  67.         }
  68.         if (this == obj) {
  69.             return true;
  70.         }
  71.         if (obj instanceof SortKeyPair) {
  72.             SortKeyPair s = (SortKeyPair) obj;
  73.             return this.count == s.getCount() && this.avgts == s.getAvgts();
  74.         } else {
  75.             return false;
  76.         }
  77.     }
  78. }</font>
复制代码

2. 自定义SortComparatorClass, 我命名为:CompositeKeyComparator.java
  1. <font color="#000000">public class CompositeKeyComparator extends WritableComparator{
  2.     public CompositeKeyComparator () {
  3.         super(SortKeyPair.class, true);
  4.     }
  5.      
  6.     @Override
  7.     public int compare(WritableComparable a, WritableComparable b) {
  8.         SortKeyPair s1 = (SortKeyPair)a;
  9.         SortKeyPair s2 = (SortKeyPair)b;
  10.          
  11.         return s1.compareTo(s2);
  12.     }
  13.      
  14. }</font>
复制代码

3. 自定义PartitionerClass, 我命名为NaturalKeyPartitioner.java
  1. <font color="#000000">public class NaturalKeyPartitioner extends Partitioner<SortKeyPair, Text>{
  2.     @Override
  3.     public int getPartition(SortKeyPair key, Text value, int numPartitions) {
  4.         // % is hash partition, can't make sure bigger count go to reducer with small id.
  5.         int count = key.getCount();
  6.         if (count <= 5) {
  7.             return 0;
  8.         } else if (count > 5 && count <= 10) {
  9.             return 1;
  10.         } else if (count > 10 && count <= 15) {
  11.             return 2;
  12.         } else {
  13.             return 3;
  14.         }
  15.     }
  16. }</font>
复制代码

4. 自定义GroupingComparatorClass,
我命名为NaturalKeyGroupComparator.java
  1. <font color="#000000">public class NaturalKeyGroupComparator extends WritableComparator {
  2.     public NaturalKeyGroupComparator() {
  3.         super(SortKeyPair.class, true);
  4.     }
  5.     @Override
  6.     public int compare(WritableComparable a, WritableComparable b) {
  7.         SortKeyPair s1 = (SortKeyPair) a;
  8.         SortKeyPair s2 = (SortKeyPair) b;
  9.         int res = s1.getCount() < s2.getCount() ? 1 : (s1.getCount() == s2
  10.                 .getCount() ? 0 : -1);
  11.         return res;
  12.     }
  13. }</font>
复制代码

5. 定义Mapper,Reducer,并定义Job提交
  1. <font color="#000000">public class SecondarySort {
  2.     /**
  3.      * It must be declared static, in case of reflection error
  4.      *
  5.      * @author lgrcyanny
  6.      *
  7.      */
  8.     public static class SortMapper extends TableMapper<SortKeyPair, Text> {
  9.                  
  10.                 // Map input [hbase row key, hbase result], output: [SortKeyPair, Text] (Text中包括了我们最后输出到文件的信息:literature row key, count, avgts)
  11.         @Override
  12.         protected void map(ImmutableBytesWritable key, Result rs,
  13.                 Context context) throws IOException, InterruptedException {
  14.             String literature = Bytes.toString(rs.getRow());
  15.             int count = Integer.valueOf(Bytes.toString(rs.getValue(
  16.                     Bytes.toBytes("info"), Bytes.toBytes("count"))));
  17.             long avgts = Long.valueOf(Bytes.toString(rs.getValue(
  18.                     Bytes.toBytes("info"), Bytes.toBytes("avgts"))));
  19.             context.write(new SortKeyPair(count, avgts), new Text(literature
  20.                     + "," + count + "," + avgts));
  21.         }
  22.     }
  23.     public static class SortReducer extends
  24.             Reducer<SortKeyPair, Text, Text, Text> {
  25.         /**
  26.          * Now the key is max SortKeyPair in the list, we just dump the ordered
  27.          * items
  28.                  * Reduce input: [SorKeyPair, list of text], Output: 1
  29.          */
  30.         @Override
  31.         protected void reduce(SortKeyPair key, Iterable<Text> items,
  32.                 Context context) throws IOException, InterruptedException {
  33.             Iterator<Text> iterator = items.iterator();
  34.             while (iterator.hasNext()) {
  35.                 context.write(null, iterator.next());// 因为value中已经包括了需要输出的信息,SortKeyPair的信息不需要输出,key设置为null即可。
  36.             }
  37.         }
  38.     }
  39.     public static void main(String[] args) throws IOException,
  40.             ClassNotFoundException, InterruptedException {
  41.                 // 因为数据源是HBase,因此需要使用HBase中的MapReduce启动设置,可以参考HBase官方网站
  42.                 // 自己做测试,可以改成文件输入,看具体需求而定
  43.         Configuration conf = HBaseConfiguration.create();
  44.         Job job = new Job(conf, "Secondarysort");
  45.         job.setJarByClass(SecondarySort.class);
  46.         Scan scan = new Scan();
  47.         scan.addFamily(Bytes.toBytes("info"));
  48.         scan.setCaching(5000); // Default is 1, set 500 improve performance
  49.         scan.setCacheBlocks(false); // Close block cache for MR job
  50.         TableMapReduceUtil.initTableMapperJob("pb_stat_comments_count", scan,
  51.                 SortMapper.class, SortKeyPair.class, Text.class, job);
  52.         job.setReducerClass(SortReducer.class);
  53.         // For secondary sort, 这里设置自定义排序的三个类
  54.         job.setSortComparatorClass(CompositeKeyComparator.class);
  55.         job.setPartitionerClass(NaturalKeyPartitioner.class);
  56.         job.setGroupingComparatorClass(NaturalKeyGroupComparator.class);
  57.         job.setOutputKeyClass(Text.class);
  58.         job.setOutputValueClass(Text.class);
  59.                  
  60.                 // Reducer的输出,设置为文件输出
  61.         FileOutputFormat.setOutputPath(job, new Path("secondary-sort-res"));
  62.         long start = System.currentTimeMillis();
  63.         boolean res = job.waitForCompletion(true);
  64.         long end = System.currentTimeMillis();
  65.         if (res) {
  66.             System.out.println("Job done with time " + (end - start));
  67.         } else {
  68.             throw new IOException("Job exit with error.");
  69.         }
  70.     }
  71. }</font>
复制代码

总结

MapReduce做排序,麻烦的不是写Mapper和Reducer,而是自定义
CompositeKeyClass
SortComparatorClas
PartitionerClass
GroupingComparatorClass
利用好Shuffle这个很magic课程,可以实现很多奇妙的功能。
本次案例的代码在我的GitHub上。
这些都是个人的理解,如有不对之处,还请指正。

最后,本学期的云计算课到最后了才让我们去些MapReduce程序,之前做无关的MySQL的项目,然后迁移HBase,看着很高端,但是做那个网站到底有什么意义,这是云计算,不是前端计算,HBase in Action中也提到,HBase的设计和MySQL的设计是完全不同的,而MySQL我相信到这个阶段的同学们都会用,何必再次重复劳动呢?直接上HBase不就可以了,本末倒置是我对本学期的课程的失望和吐槽。课程内容多半是概念,涉及技术的有多少?看论文,看概念每一个研究生都可以做到,而我想我们缺的是实践上的指导,比如MapReduce,除了能讲讲WordCount,还能再深入点么?Shuffle的过程可以多说说么?Zookeeper里面的PAXOS算法可以跟我们说说么?学院有Hadoop的集群机器,可以让我们去玩玩不?而不是写那个无聊的网站。

我个人觉得让我们去写一些Hadoop的架构分析,看看源码,也比写那个无聊的网站强。最后9次作业,做的头大,重复劳动,体力劳动,最后还考试,说写的多分会高,有意思么?那么多作业还考试,数据库课程就两次作业+一个论文,但是我觉得我学了也复习了很多知识,比起那些大而空的概念,高到不知哪里去了。

我对云计算,Hadoop,HBase感兴趣,本学期学了很多,但不是课堂上学的,估计我最后对云计算的概念们,要靠下周背PPT了。 我对云计算有兴趣,但对本学期的课程没兴趣。吐槽都是不好的情绪,调整好心态,好好考试吧~BTW,下周还有一次作业。




回复

使用道具 举报

xng2012 发表于 2014-4-10 16:55:42
MapReduce不适用那些数据?
MapReduceHadoop中处理大数据的方法,是一个处理大数据的简单算法、编程泛型。虽然思想简单,但其实真正用起来还是有很多问题,不是所有的问题都可以像WordCount那样典型和直观, 有很多需要trick的地方。MapReduce的中心思想是分而治之,数据要松耦合,可以划分为小数据集并行处理,如果数据本身在计算上存在很强的依赖关系,就不要赶鸭子上架,用MapReduce了。


MapReduce如何降低实现的复杂度?
MapReduce编程中,最重要的是要抓住Map和Reduce的input和output,好的input和output可以降低实现的复杂度。


Map端和Reduce端是否都有Shuffle过程?
二者都包含Shuffle

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条