分享

MapReduce中combine、partition、shuffle的作用是什么?在程序中怎么运用?

chenhaoyes 发表于 2014-8-27 15:32:03 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 13 115064
现在主要用的是map()和reduce(),使用combine时也是调用的reduce()的代码,请问下combine、partition、shuffle各自的作用,能不能举例说明下。

已有(13)人评论

跳转到指定楼层
sstutu 发表于 2014-8-27 18:08:36
更多资料:

彻底了解mapreduce核心Shuffle--解惑各种mapreduce问题


网上的资料很多,关于类的很少,这里补充一些:


InputFormat类:该类的作用是将输入的文件和数据分割成许多小的split文件,并将split的每个行通过LineRecorderReader解析成<Key,Value>,通过job.setInputFromatClass()函数来设置,默认的情况为类TextInputFormat,其中Key默认为字符偏移量,value是该行的值。

Map类:根据输入的<Key,Value>对生成中间结果,默认的情况下使用Mapper类,该类将输入的<Key,Value>对原封不动的作为中间按结果输出,通过job.setMapperClass()实现。实现Map函数。

Combine类:实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数

Partitioner类: 该该主要在Shuffle过程中按照Key值将中间结果分成R份,其中每份都有一个Reduce去负责,可以通过job.setPartitionerClass()方法进行设置,默认的使用hashPartitioner类。实现getPartition函数

Reducer类:将中间结果合并,得到中间结果。通过job.setReduceCalss()方法进行设置,默认使用Reducer类,实现reduce方法。

OutPutFormat类:该类负责输出结果的格式。可以通过job.setOutputFormatClass()方法进行设置。默认使用TextOUtputFormat类,得到<Key,value>对。
note:hadoop主要是上面的六个类进行mapreduce操作,使用默认的类,处理的数据和文本的能力很有限,具体的项目中,用户通过改写这六个类(重载六个类),完成项目的需求。说实话,我刚开始学的时候,我怀疑过Mapreudce处理数据功能,随着学习深入,真的很钦佩mapreduce的设计,基本就二个函数,通过重载,可以完成所有你想完成的工作。


  1. public  static void main(String[] args)throws IOException {
  2.         Configuration conf = new Configuration();
  3.         Job job = new Job(conf);
  4.         job.setInputFormatClass(TextInputFormat.class);
  5.         job.setMapperClass(Mapper.class);
  6.         job.setCombinerClass(null);
  7.         job.setPartitionerClass(HashPartitioner.class);
  8.         job.setReducerClass(Reducer.class);
  9.         job.setOutputFormatClass(TextOutFormat.class);
  10.     }
  11. }
复制代码






回复

使用道具 举报

sstutu 发表于 2014-8-27 18:01:27
本帖最后由 pig2 于 2014-8-27 20:29 编辑

Mapreduce在hadoop中是一个比较难以的概念。下面需要用心看,然后自己就能总结出来了。

概括:
combine和partition都是函数,中间的步骤应该只有shuffle!

1.combine
combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义的。
combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一个新的<key2,value2>.将新的<key2,value2>作为输入到reduce函数中
这个value2亦可称之为values,因为有多个。这个合并的目的是为了减少网络传输。

具体实现是由Combine类。
实现combine函数,该类的主要功能是合并相同的key键,通过job.setCombinerClass()方法设置,默认为null,不合并中间结果。实现map函数
具体调用:(下图是调用reduce,合并map的个数)

难点:不知道这个reduce和mapreduce中的reduce区别是什么?
下面简单说一下:后面慢慢琢磨:
在mapreduce中,map多,reduce少。
在reduce中由于数据量比较多,所以干脆,我们先把自己map里面的数据归类,这样到了reduce的时候就减轻了压力。

这里举个例子:
map与reduce的例子
map理解为销售人员,reduce理解为销售经理。
每个人(map)只管销售,赚了多少钱销售人员不统计,也就是说这个销售人员没有Combine,那么这个销售经理就累垮了,因为每个人都没有统计,它需要统计所有人员卖了多少件赚钱了多少钱。
这样是不行的,所以销售经理(reduce)为了减轻压力,每个人(map)都必须统计自己卖了多少钱,赚了多少钱(Combine),然后经理所做的事情就是统计每个人统计之后的结果。这样经理就轻松多了。所以Combine在map所做的事情,减轻了reduce的事情。
(这就是为什么说map的Combine干reduce的事情,相信你应该明白了)
public  static void main(String[] args)throws IOException {
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(Mapper.class);
        job.setCombinerClass(reduce.class);
        job.setPartitionerClass(HashPartitioner.class);
        job.setReducerClass(Reducer.class);
        job.setOutputFormatClass(TextOutFormat.class);
    }
}



2.partition
partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。
我们对于错综复杂的数据归类。比如在动物园里有牛羊鸡鸭鹅,他们都是混在一起的,但是到了晚上他们就各自牛回牛棚,羊回羊圈,鸡回鸡窝。partition的作用就是把这些数据归类。只不过在写程序的时候,mapreduce使用哈希HashPartitioner帮我们归类了。这个我们也可以自定义。

        HashPartitioner是mapreduce的默认partitioner。计算方法是

which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

下面在看该如何自定义,该如何调用:(下面便是自定义了一个Partition函数,红字部分是算法的核心,也就是分区的核心
public static class Partition extends Partitioner<IntWritable, IntWritable> {
                @Override
                public int getPartition(IntWritable key, IntWritable value,
                                int numPartitions) {
                        int Maxnumber = 65223;
                        int bound = Maxnumber / numPartitions + 1;
                        int keynumber = key.get();
                        for (int i = 0; i < numPartitions; i++) {
                                if (keynumber < bound * i && keynumber >= bound * (i - 1)) {
                                        return i - 1;
                                }

                        }
                        return 0;
                }

        }

那么我们该如何调用:(下面调用之后,你的分区函数就生效了)

public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = new Job(conf, "sort");
job.setJarByClass(Sort.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");
FileOutputFormat
.setOutputPath(job, new Path("/home/asheng/hadoop/out"));
job.waitForCompletion(true);
}
}



3.shuffle

shuffle就是map和reduce之间的过程,包含了两端的combine和partition。它比较难以理解,因为我们摸不着,看不到它,它只是理论存在的,而且确实存在,它属于mapreduce的框架,编程的时候,我们用不到它,它属于mapreduce框架。详细可以看通过实例让你真正明白mapreduce---填空式、分布(分割)编程

3.1shuffle的作用是
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出
shuffle阶段的主要函数是fetchOutputs(),这个函数的功能就是将map阶段的输出,copy到reduce 节点本地。






回复

使用道具 举报

chenhaoyes 发表于 2014-8-27 19:46:10
回复

使用道具 举报

hb1984 发表于 2014-8-27 21:39:38
谢谢楼主分享。            
回复

使用道具 举报

chenhaoyes 发表于 2014-8-28 10:12:18
sstutu 发表于 2014-8-27 18:08
更多资料:

彻底了解mapreduce核心Shuffle--解惑各种mapreduce问题

Map任务和Reduce任务都是在集群上很多服务器上运行的,Reduce任务具体在哪台服务器运行时根据partition来决定的,那Map任务呢?
回复

使用道具 举报

GreenArrow 发表于 2014-8-31 09:45:37
受用了,谢谢
回复

使用道具 举报

dearboll 发表于 2014-9-1 18:25:03
楼主你好,想请教一下,combine和partition到底谁先执行?是先执行partition函数确定map的结果由哪个reduce执行,然后才进行combine的吗?
回复

使用道具 举报

tang 发表于 2015-4-3 21:31:50
回复

使用道具 举报

蒲公英_gJM1Z 发表于 2015-4-10 16:22:01
dearboll 发表于 2014-9-1 18:25
楼主你好,想请教一下,combine和partition到底谁先执行?是先执行partition函数确定map的结果由哪个reduce ...

可以看看hadoop技术内幕深入解析MR,里面有一章Task,感觉介绍的还不错

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条