分享

MapReduce算法的实现——1 TB数据排序源码分析

xioaxu790 发表于 2014-11-30 19:05:48 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 16961
问题导读
1、Terasort算法的关键点有哪些?
2、map task如何对数据记录做标记?
3、map task怎样对一个数据块进行局部排序?






1、概述
1TB排序通常用于衡量分布式数据处理框架的数据处理能力。Terasort是Hadoop中的的一个排序作业,在2008年,Hadoop在1TB排序基准评估中赢得第一名,耗时209秒。那么Terasort在Hadoop中是怎样实现的呢?本文主要从算法设计角度分析Terasort作业。


2、算法思想
实际上,当我们要把传统的串行排序算法设计成并行的排序算法时,通常会想到分而治之的策略,即:把要排序的数据划成M个数据块(可以用Hash的方法做到),然后每个map task对一个数据块进行局部排序,之后,一个reduce task对所有数据进行全排序。这种设计思路可以保证在map阶段并行度很高,但在reduce阶段完全没有并行。
1.jpg

传统并行sort算法

为了提高reduce阶段的并行度,TeraSort作业对以上算法进行改进:在map阶段,每个map task都会将数据划分成R个数据块(R为reduce task个数),其中第i(i>0)个数据块的所有数据都会比第i+1个中的数据大;在reduce阶段,第i个reduce task处理(进行排序)所有map task的第i块,这样第i个reduce task产生的结果均会比第i+1个大,最后将1~R个reduce task的排序结果顺序输出,即为最终的排序结果。这种设计思路很明显比第一种高效,但实现难度较大,它需要解决以下两个技术难点:第一,如何确定每个map task数据的R个数据块的范围? 第二,对于某条数据,如果快速的确定它属于哪个数据块?答案分别为【采样】和【trie树】。
2.jpg

Terasort流程


3、Terasort算法
3.1  Terasort算法流程
对于Hadoop的Terasort排序算法,主要由3步组成:采样 –>> map task对于数据记录做标记 –>> reduce task进行局部排序。
数据采样在JobClient端进行,首先从输入数据中抽取一部分数据,将这些数据进行排序,然后将它们划分成R个数据块,找出每个数据块的数据上限和下线(称为“分割点”),并将这些分割点保存到分布式缓存中。
在map阶段,每个map task首先从分布式缓存中读取分割点,并对这些分割点建立trie树(两层trie树,树的叶子节点上保存有该节点对应的reduce task编号)。然后正式开始处理数据,对于每条数据,在trie树中查找它属于的reduce task的编号,并保存起来。
在reduce阶段,每个reduce task从每个map task中读取其对应的数据进行局部排序,最后将reduce task处理后结果按reduce task编号依次输出即可。

3.2    Terasort算法关键点
(1)采样
Hadoop自带了很多数据采样工具,包括IntercalSmapler,RandomSampler,SplitSampler等(具体见org.apache.hadoop.mapred.lib)。
采样数据条数:sampleSize = conf.getLong(“terasort.partitions.sample”, 100000);
选取的split个数:samples = Math.min(10, splits.length); splits是所有split组成的数组。
每个split提取的数据条数:recordsPerSample = sampleSize / samples;
对采样的数据进行全排序,将获取的“分割点”写到文件_partition.lst中,并将它存放到分布式缓存区中。
举例说明:比如采样数据为b,abc,abd,bcd,abcd,efg,hii,afd,rrr,mnk
经排序后,得到:abc,abcd,abd,afd,b,bcd,efg,hii,mnk,rrr
如果reduce task个数为4,则分割点为:abd,bcd,mnk

(2)map task对数据记录做标记
每个map task从文件_partition.lst读取分割点,并创建trie树(假设是2-trie,即组织利用前两个字节)。
Map task从split中一条一条读取数据,并通过trie树查找每条记录所对应的reduce task编号。比如:abg对应第二个reduce task, mnz对应第四个reduce task。
3.jpg

(3)reduce task进行局部排序
每个reduce task进行局部排序,依次输出结果即可。
  
4、MapReduce examples code
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements.  See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership.  The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License.  You may obtain a copy of the License at
  9. *
  10. *     http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */  
  18.   
  19. package org.apache.hadoop.examples.terasort;  
  20.   
  21. import java.io.DataInputStream;  
  22. import java.io.IOException;  
  23. import java.io.PrintStream;  
  24. import java.net.URI;  
  25.   
  26. import org.apache.commons.logging.Log;  
  27. import org.apache.commons.logging.LogFactory;  
  28. import org.apache.hadoop.conf.Configurable;  
  29. import org.apache.hadoop.conf.Configuration;  
  30. import org.apache.hadoop.conf.Configured;  
  31. import org.apache.hadoop.fs.FileSystem;  
  32. import org.apache.hadoop.fs.Path;  
  33. import org.apache.hadoop.io.Text;  
  34. import org.apache.hadoop.mapreduce.Job;  
  35. import org.apache.hadoop.mapreduce.JobContext;  
  36. import org.apache.hadoop.mapreduce.MRJobConfig;  
  37. import org.apache.hadoop.mapreduce.Partitioner;  
  38. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  39. import org.apache.hadoop.util.Tool;  
  40. import org.apache.hadoop.util.ToolRunner;  
  41.   
  42. /**
  43. * Generates the sampled split points, launches the job, and waits for it to
  44. * finish.  
  45. * <p>
  46. * To run the program:  
  47. * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
  48. */  
  49. public class TeraSort extends Configured implements Tool {  
  50.   private static final Log LOG = LogFactory.getLog(TeraSort.class);  
  51.   static String SIMPLE_PARTITIONER = "mapreduce.terasort.simplepartitioner";  
  52.   static String OUTPUT_REPLICATION = "mapreduce.terasort.output.replication";  
  53.   
  54.   /**
  55.    * A partitioner that splits text keys into roughly equal partitions
  56.    * in a global sorted order.
  57.    */  
  58.   static class TotalOrderPartitioner extends Partitioner<Text,Text>  
  59.       implements Configurable {  
  60.     private TrieNode trie;  
  61.     private Text[] splitPoints;  
  62.     private Configuration conf;  
  63.   
  64.     /**
  65.      * A generic trie node
  66.      */  
  67.     static abstract class TrieNode {  
  68.       private int level;  
  69.       TrieNode(int level) {  
  70.         this.level = level;  
  71.       }  
  72.       abstract int findPartition(Text key);  
  73.       abstract void print(PrintStream strm) throws IOException;  
  74.       int getLevel() {  
  75.         return level;  
  76.       }  
  77.     }  
  78.   
  79.     /**
  80.      * An inner trie node that contains 256 children based on the next
  81.      * character.
  82.      */  
  83.     static class InnerTrieNode extends TrieNode {  
  84.       private TrieNode[] child = new TrieNode[256];  
  85.         
  86.       InnerTrieNode(int level) {  
  87.         super(level);  
  88.       }  
  89.       int findPartition(Text key) {  
  90.         int level = getLevel();  
  91.         if (key.getLength() <= level) {  
  92.           return child[0].findPartition(key);  
  93.         }  
  94.         return child[key.getBytes()[level] & 0xff].findPartition(key);  
  95.       }  
  96.       void setChild(int idx, TrieNode child) {  
  97.         this.child[idx] = child;  
  98.       }  
  99.       void print(PrintStream strm) throws IOException {  
  100.         for(int ch=0; ch < 256; ++ch) {  
  101.           for(int i = 0; i < 2*getLevel(); ++i) {  
  102.             strm.print(' ');  
  103.           }  
  104.           strm.print(ch);  
  105.           strm.println(" ->");  
  106.           if (child[ch] != null) {  
  107.             child[ch].print(strm);  
  108.           }  
  109.         }  
  110.       }  
  111.     }  
  112.   
  113.     /**
  114.      * A leaf trie node that does string compares to figure out where the given
  115.      * key belongs between lower..upper.
  116.      */  
  117.     static class LeafTrieNode extends TrieNode {  
  118.       int lower;  
  119.       int upper;  
  120.       Text[] splitPoints;  
  121.       LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {  
  122.         super(level);  
  123.         this.splitPoints = splitPoints;  
  124.         this.lower = lower;  
  125.         this.upper = upper;  
  126.       }  
  127.       int findPartition(Text key) {  
  128.         for(int i=lower; i<upper; ++i) {  
  129.           if (splitPoints[i].compareTo(key) > 0) {  
  130.             return i;  
  131.           }  
  132.         }  
  133.         return upper;  
  134.       }  
  135.       void print(PrintStream strm) throws IOException {  
  136.         for(int i = 0; i < 2*getLevel(); ++i) {  
  137.           strm.print(' ');  
  138.         }  
  139.         strm.print(lower);  
  140.         strm.print(", ");  
  141.         strm.println(upper);  
  142.       }  
  143.     }  
  144.   
  145.   
  146.     /**
  147.      * Read the cut points from the given sequence file.
  148.      * @param fs the file system
  149.      * @param p the path to read
  150.      * @param job the job config
  151.      * @return the strings to split the partitions on
  152.      * @throws IOException
  153.      */  
  154.     private static Text[] readPartitions(FileSystem fs, Path p,  
  155.         Configuration conf) throws IOException {  
  156.       int reduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);  
  157.       Text[] result = new Text[reduces - 1];  
  158.       DataInputStream reader = fs.open(p);  
  159.       for(int i=0; i < reduces - 1; ++i) {  
  160.         result[i] = new Text();  
  161.         result[i].readFields(reader);  
  162.       }  
  163.       reader.close();  
  164.       return result;  
  165.     }  
  166.   
  167.     /**
  168.      * Given a sorted set of cut points, build a trie that will find the correct
  169.      * partition quickly.
  170.      * @param splits the list of cut points
  171.      * @param lower the lower bound of partitions 0..numPartitions-1
  172.      * @param upper the upper bound of partitions 0..numPartitions-1
  173.      * @param prefix the prefix that we have already checked against
  174.      * @param maxDepth the maximum depth we will build a trie for
  175.      * @return the trie node that will divide the splits correctly
  176.      */  
  177.     private static TrieNode buildTrie(Text[] splits, int lower, int upper,   
  178.                                       Text prefix, int maxDepth) {  
  179.       int depth = prefix.getLength();  
  180.       if (depth >= maxDepth || lower == upper) {  
  181.         return new LeafTrieNode(depth, splits, lower, upper);  
  182.       }  
  183.       InnerTrieNode result = new InnerTrieNode(depth);  
  184.       Text trial = new Text(prefix);  
  185.       // append an extra byte on to the prefix  
  186.       trial.append(new byte[1], 0, 1);  
  187.       int currentBound = lower;  
  188.       for(int ch = 0; ch < 255; ++ch) {  
  189.         trial.getBytes()[depth] = (byte) (ch + 1);  
  190.         lower = currentBound;  
  191.         while (currentBound < upper) {  
  192.           if (splits[currentBound].compareTo(trial) >= 0) {  
  193.             break;  
  194.           }  
  195.           currentBound += 1;  
  196.         }  
  197.         trial.getBytes()[depth] = (byte) ch;  
  198.         result.child[ch] = buildTrie(splits, lower, currentBound, trial,   
  199.                                      maxDepth);  
  200.       }  
  201.       // pick up the rest  
  202.       trial.getBytes()[depth] = (byte) 255;  
  203.       result.child[255] = buildTrie(splits, currentBound, upper, trial,  
  204.                                     maxDepth);  
  205.       return result;  
  206.     }  
  207.   
  208.     public void setConf(Configuration conf) {  
  209.       try {  
  210.         FileSystem fs = FileSystem.getLocal(conf);  
  211.         this.conf = conf;  
  212.         Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);  
  213.         splitPoints = readPartitions(fs, partFile, conf);  
  214.         trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);  
  215.       } catch (IOException ie) {  
  216.         throw new IllegalArgumentException("can't read partitions file", ie);  
  217.       }  
  218.     }  
  219.   
  220.     public Configuration getConf() {  
  221.       return conf;  
  222.     }  
  223.       
  224.     public TotalOrderPartitioner() {  
  225.     }  
  226.   
  227.     public int getPartition(Text key, Text value, int numPartitions) {  
  228.       return trie.findPartition(key);  
  229.     }  
  230.       
  231.   }  
  232.    
  233.   /**
  234.    * A total order partitioner that assigns keys based on their first  
  235.    * PREFIX_LENGTH bytes, assuming a flat distribution.
  236.    */  
  237.   public static class SimplePartitioner extends Partitioner<Text, Text>  
  238.       implements Configurable {  
  239.     int prefixesPerReduce;  
  240.     private static final int PREFIX_LENGTH = 3;  
  241.     private Configuration conf = null;  
  242.     public void setConf(Configuration conf) {  
  243.       this.conf = conf;  
  244.       prefixesPerReduce = (int) Math.ceil((1 << (8 * PREFIX_LENGTH)) /   
  245.         (float) conf.getInt(MRJobConfig.NUM_REDUCES, 1));  
  246.     }  
  247.       
  248.     public Configuration getConf() {  
  249.       return conf;  
  250.     }  
  251.       
  252.     @Override  
  253.     public int getPartition(Text key, Text value, int numPartitions) {  
  254.       byte[] bytes = key.getBytes();  
  255.       int len = Math.min(PREFIX_LENGTH, key.getLength());  
  256.       int prefix = 0;  
  257.       for(int i=0; i < len; ++i) {  
  258.         prefix = (prefix << 8) | (0xff & bytes[i]);  
  259.       }  
  260.       return prefix / prefixesPerReduce;  
  261.     }  
  262.   }  
  263.   
  264.   public static boolean getUseSimplePartitioner(JobContext job) {  
  265.     return job.getConfiguration().getBoolean(SIMPLE_PARTITIONER, false);  
  266.   }  
  267.   
  268.   public static void setUseSimplePartitioner(Job job, boolean value) {  
  269.     job.getConfiguration().setBoolean(SIMPLE_PARTITIONER, value);  
  270.   }  
  271.   
  272.   public static int getOutputReplication(JobContext job) {  
  273.     return job.getConfiguration().getInt(OUTPUT_REPLICATION, 1);  
  274.   }  
  275.   
  276.   public static void setOutputReplication(Job job, int value) {  
  277.     job.getConfiguration().setInt(OUTPUT_REPLICATION, value);  
  278.   }  
  279.   
  280.   public int run(String[] args) throws Exception {  
  281.     LOG.info("starting");  
  282.     Job job = Job.getInstance(getConf());  
  283.     Path inputDir = new Path(args[0]);  
  284.     Path outputDir = new Path(args[1]);  
  285.     boolean useSimplePartitioner = getUseSimplePartitioner(job);  
  286.     TeraInputFormat.setInputPaths(job, inputDir);  
  287.     FileOutputFormat.setOutputPath(job, outputDir);  
  288.     job.setJobName("TeraSort");  
  289.     job.setJarByClass(TeraSort.class);  
  290.     job.setOutputKeyClass(Text.class);  
  291.     job.setOutputValueClass(Text.class);  
  292.     job.setInputFormatClass(TeraInputFormat.class);  
  293.     job.setOutputFormatClass(TeraOutputFormat.class);  
  294.     if (useSimplePartitioner) {  
  295.       job.setPartitionerClass(SimplePartitioner.class);  
  296.     } else {  
  297.       long start = System.currentTimeMillis();  
  298.       Path partitionFile = new Path(outputDir,   
  299.                                     TeraInputFormat.PARTITION_FILENAME);  
  300.       URI partitionUri = new URI(partitionFile.toString() +  
  301.                                  "#" + TeraInputFormat.PARTITION_FILENAME);  
  302.       try {  
  303.         TeraInputFormat.writePartitionFile(job, partitionFile);  
  304.       } catch (Throwable e) {  
  305.         LOG.error(e.getMessage());  
  306.         return -1;  
  307.       }  
  308.       job.addCacheFile(partitionUri);   
  309.       long end = System.currentTimeMillis();  
  310.       System.out.println("Spent " + (end - start) + "ms computing partitions.");  
  311.       job.setPartitionerClass(TotalOrderPartitioner.class);  
  312.     }  
  313.       
  314.     job.getConfiguration().setInt("dfs.replication", getOutputReplication(job));  
  315.     TeraOutputFormat.setFinalSync(job, true);  
  316.     int ret = job.waitForCompletion(true) ? 0 : 1;  
  317.     LOG.info("done");  
  318.     return ret;  
  319.   }  
  320.   
  321.   /**
  322.    * @param args
  323.    */  
  324.   public static void main(String[] args) throws Exception {  
  325.     int res = ToolRunner.run(new Configuration(), new TeraSort(), args);  
  326.     System.exit(res);  
  327.   }  
  328.   
  329. }
复制代码




已有(1)人评论

跳转到指定楼层
kanwei163 发表于 2014-12-1 10:43:55
TeraOutputFormat 这个类在哪??
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条