分享

MapReduce 二次排序和全排序

javaanddonet 发表于 2018-3-30 15:44:10 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 12455
本帖最后由 javaanddonet 于 2018-3-30 16:20 编辑

最近练习MR的二次排序和全排序。
先看输入文件:
aa 99
bb 98
cc 97
dd 96
aa 80
bb 70
cc 60
dd 77
aa 89
bb 91
cc 79
dd 83
aa 93
bb 94
cc 66
dd 86
3 60
4 22
5 10
6 50
3 81
4 75
5 67
6 27
3 19
4 51
5 69
6 33
3 43
4 64
5 46
6 36


我想让结果先按第一列升序排列,然后在这个基础上,在按照第二列降序排列,然后在取每一组的前两名。

我先考虑使用Map方法将所有key相同的值,聚集为<key,<v2,v4,v1,v3>>这种格式,然后在reduce阶段,对map的输出的value,转换为ArrayList<Int>数组,然后是JDK的Collections.sort(arrayList);进行排序。这样可以实现但是如果数据量很大,估计这个性能是个问题。
然后我查阅资料,需要自己定义一个map的输出类型,将输入文件的每一行值,当作一个key,然后把每一行第二列的值,当作一个value来交个map方法处理。Map的输出的KV是这种类型的:<<k1,v1,>, v1>;<<k2,v2,>, v2>。这就要自己去定义一个数据类型为KV对的数据类如下:然后在Map方法中,将这个数据类型提供给Map函数来使用,当作Map函数的输出的key的数据类型。我的问题来了
第一个问题:下面的这个自定义的数据类型中,我已经重写了compareTo方法,我发现这样已经ok了。已经实现类二次排序的功能。那为何我看网上很多人还要设置下面的这两个属性?并且在自己定义的实现类中重写了compare或compareTo方法。我感觉在自定义的数据类型中实现了两个字段的排序,这里不需要再次调用设置了。否则需要在这里进行重新配置实现。

job.setGroupingComparatorClass()//设置自定义分组的实现,当然这里会设置自己的实现类
job.setSortComparatorClass()//设置自定义排序的实现,当然这里会设置自己的实现类

第二个问题:不用设置reduce的实现,map任务完全可以完成二次排序了。问题网上很多人还要设置下面的reduce的实现?
job.setReducerClass(MyReduce.class)


第三个问题:设置分区的时候,我发现一个问题,如果设置了reduce的个数为1,那么即便是自定义了分区的实现,好像也不会调用实现类。因为我在自定义的分区中输出了一句话,真个MR运行完了,我发现没有输出我要输出的内容。而当我将reduce的个数设置2的时候,就会调用定义的分区实现类。也输出了我想输出内容。我能否这么认为:如果setNumReduceTasks设置为1,那么就不用自定义分区实现类了。因为即便是设置了,也不会调用。
job.setPartitionerClass(MyPartitionerClass.class);//设置分区
job.setNumReduceTasks(1)//设置reduce的个数

第四个问题:我想取排序后的每一组的前两名,怎么做?在reduce的迭代循环中,使用循环判定吗?如下代码中添加判断超过2就return不输出了?有没有类似于saprk中take(2)质量的函数?[mw_shl_code=java,true]text.set(reduceKeyInPairKeyWritable.getFirst());
                        for(IntWritable valueIntWritable : reduceValueInIterable){
                                context.write(text, valueIntWritable);
                        }[/mw_shl_code]



[mw_shl_code=java,true]//需要实现数据的序列化与反序列化,这样才能在多个节点之间传输数据!
//自定义组合数据类型,用与map和reduce的输入输出使用。
public class PairKeyWritable implements WritableComparable<PairKeyWritable>{
        private String first;
        private int second;
        public String getFirst() {
                return first;
        }
        public void setFirst(String first) {
                this.first = first;
        }
        public int getSecond() {
                return second;
        }
        public void setSecond(int second) {
                this.second = second;
        }
        public void set(String first, int second){
                this.setFirst(first);
                this.setSecond(second);
        }
        public PairKeyWritable(String first, int second) {
                this.set(first, second);
        }
        
        // 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数  
        public PairKeyWritable() { }
        
        //用于输出结果以tab键隔开
        @Override
        public String toString() {
//                        return this.getFirst() + "\t" + this.getSecond();//可以自己灵活定义对象toString的输出的方式
//                        return this.getFirst() + "\t";
                return this.getFirst();
        }
        
        
        
        
        //该方法需要重写,因为在根据可以进行分区的时候,会使用到该方法如下所示,如果job设置了分区方法,则需要调用这个方法。
//                /** Use {@link Object#hashCode()} to partition. */
//                  public int getPartition(K key, V value,
//                                          int numReduceTasks) {
//                    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
//                  }
        @Override
        public int hashCode() {
                final int prime = 31;
                int result = 1;
                result = prime * result + ((first == null) ? 0 : first.hashCode());
                result = prime * result + second;
                return result;
        }
        //至于为何要重写hashCode和equals方法,看看这个帖子:hashcode方法的作用
        //https://blog.csdn.net/anmoyyh/article/details/76019777
        @Override
        public boolean equals(Object obj) {
                if (this == obj)
                        return true;
                if (obj == null)
                        return false;
                if (getClass() != obj.getClass())
                        return false;
                PairKeyWritable other = (PairKeyWritable) obj;
                if (first == null) {
                        if (other.first != null)
                                return false;
                } else if (!first.equals(other.first))
                        return false;
                if (second != other.second)
                        return false;
                return true;
        }
        
        //注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
        @Override
        public void write(DataOutput out) throws IOException {
                out.writeUTF(this.first);
                out.writeInt(this.second);
        }
        
        //注意:读和写的时候,字段的顺序要一致,否则会出现读写字段混乱的问题。
        @Override
        public void readFields(DataInput in) throws IOException {
                this.first = in.readUTF();
                this.second = in.readInt();
        }
        
        //比较排序,先按照第一个字段排序,如果第一个自动排序完成后,再按照第二个字段排序。
        @Override
        public int compareTo(PairKeyWritable o) {
                PairKeyWritable pairKeyWritable = (PairKeyWritable) o;
                int firstCompareResult = this.getFirst().compareTo(pairKeyWritable.getFirst());
               
                //如果比较结果大于0,返回1;如果小于0,返回-1;等于0,返回0。
                //如果直接返回第一个字段比较结果,而不是用下面的if...else分别返回,那么最终的结果只有第一个字段是排序的,第二个字段是无序的。
//                        return firstCompareResult > 0 ? 1 : firstCompareResult < 0 ? -1 : 0;
               
                if(firstCompareResult != 0){//如果比较结果不等0,表示两个值不相等,那么直接返回该比较结果,否则使用第二个字段进行比较。
                        return firstCompareResult > 0 ? 1 : -1;//升序
//                                return firstCompareResult > 0 ? -1 : 1;//降序
                }else{
                        int secondCompareResult = this.getSecond() - pairKeyWritable.getSecond();//如果比较结果大于0,返回1; 如果小于0,返回-1; 如果等于0,返回0,表示两个要比较的值相等。
//                                return secondCompareResult > 0 ? 1 : secondCompareResult < 0 ? -1 : 0;//升序
                        return secondCompareResult > 0 ? -1 : secondCompareResult < 0 ? 1 : 0;//降序
                }

        }
}[/mw_shl_code]



Map函数:
[mw_shl_code=java,true]public static class MyMap extends Mapper<LongWritable, Text, PairKeyWritable, IntWritable>{

                @Override
                protected void setup(Context context)
                                throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
               
                PairKeyWritable pairKeyWritable = new PairKeyWritable();
                String first = null;
                int second = 0;
                String[] lineStringArray = null;
                String line = null;
                // MapReduce框架每读一行数据就调用一次map方法
                @Override
                public void map(LongWritable mapKeyInLongWritable, Text mapValueInText, Context context) throws IOException, InterruptedException {
                        System.out.println("MyMap.........");
                        line = mapValueInText.toString();
                        lineStringArray = line.split(" ");
                        first = lineStringArray[0];
                        second = Integer.parseInt(lineStringArray[1]);
                        pairKeyWritable.set(first, second);
                        context.write(pairKeyWritable, new IntWritable(second));
                }
               
                @Override
                protected void cleanup(Context context)
                                throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
        }[/mw_shl_code]

Reduce函数:
[mw_shl_code=java,true]public static class MyReduce extends Reducer<PairKeyWritable, IntWritable, Text, IntWritable>{
               
                @Override
                protected void setup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
                                throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                }
                private Text text = new Text();
                @Override
                protected void reduce(PairKeyWritable reduceKeyInPairKeyWritable, Iterable<IntWritable> reduceValueInIterable, Context context) throws IOException, InterruptedException {
                        System.out.println("MyReduce====================");
                        //下面的for循环也可以正常显示已结果,这个更简洁。
//                        context.write(new Text(reduceKeyInPairKeyWritable.getFirst()), new IntWritable(reduceKeyInPairKeyWritable.getSecond()));
                        
                        text.set(reduceKeyInPairKeyWritable.getFirst());
                        for(IntWritable valueIntWritable : reduceValueInIterable){
                                context.write(text, valueIntWritable);
                        }
                        
                }
               
                @Override
                protected void cleanup(Reducer<PairKeyWritable, IntWritable, Text, IntWritable>.Context context)
                                throws IOException, InterruptedException {
                        // TODO Auto-generated method stub
                        super.cleanup(context);
                }
        }[/mw_shl_code]


分区函数:问题?如果reduce个数设置1,自定义的分区不会调用吗?
[mw_shl_code=java,true]public static class MyPartitionerClass extends Partitioner<PairKeyWritable, IntWritable>{
                @Override
                public int getPartition(PairKeyWritable mapKeyOut, IntWritable mapValueOut, int numPartitions) {
                        System.out.println("MyPartitionerClass====================");
                        System.out.println("numPartitions="+numPartitions);
                        //根据key的hash code进行分区。如果很大数据量需要考虑抽样分区,然后才能避免出现数据倾斜和访问热点的问题
                        System.out.println("mapKeyOut.getFirst()="+mapKeyOut.getFirst());
                        System.out.println("mapKeyOut.getFirst().hashCode()="+mapKeyOut.getFirst().hashCode());
                        //与reduce job的个数进相除然后取余数。
                        int partitionResult = (mapKeyOut.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
                        System.out.println("partitionResult="+partitionResult);
                        System.out.println("");
                        return partitionResult;
                }
        }[/mw_shl_code]


分组函数:这里有一个问题,见里面的注释。我在里面将Object转换一下不可以吗?
[mw_shl_code=java,true]public static class MyGroupingComparatorClass extends WritableComparator{
               
                //为何要这个方法?//必须要调用父类的构造器
                protected MyGroupingComparatorClass() {
                        super(PairKeyWritable.class, true);//注册comparator
                }
               
//                public int compare(Object o1, Object o2) {
                //为何上面的方法不行?必须要指定为:WritableComparable
                @Override
                public int compare(WritableComparable o1, WritableComparable o2) {
                        System.out.println("MyGroupingComparatorClass&&&&&&&&&&&&&&&&&&&&&&&&");
                        PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
                        PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
                        int minus = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
                return minus;
                }
        }[/mw_shl_code]


排序函数:
[mw_shl_code=java,true]public static class MySortComparatorClass extends WritableComparator{
               
                //为何要这个方法?//必须要调用父类的构造器
                protected MySortComparatorClass() {
                        super(PairKeyWritable.class, true);//注册comparator
                }
               
//                public int compare(Object o1, Object o2) {
                //为何上面的方法不行?必须要指定为:WritableComparable
                @Override
                public int compare(WritableComparable o1, WritableComparable o2) {
                        System.out.println("MySortComparatorClass###########################");
                        PairKeyWritable doubleSortBean_1 = (PairKeyWritable) o1;
                        PairKeyWritable doubleSortBean_2 = (PairKeyWritable) o2;
                        int firstComapreResult = doubleSortBean_1.getFirst().compareTo(doubleSortBean_2.getFirst());
                        //分组内部进行排序,按照第二个字段进行排序,首先要保证是同一个组内,同一个组的标识就是第一个字段相同
                if (firstComapreResult != 0){
                    return firstComapreResult > 0 ? 1 : -1;
                } else {  
                    int secondComapreResult = doubleSortBean_1.getSecond() - doubleSortBean_2.getSecond();
                    return secondComapreResult > 0 ? -1 : secondComapreResult < 0 ? 1 : 0;
                }
                }
        }[/mw_shl_code]





接下来是全排序的问题。我在使用InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);进行全排序的时候,为何看不到随机抽样的文件在HDFS上面,全局排序的设置如下:
[mw_shl_code=java,true] // 设置partition file全路径到conf
        Path path = new Path(args[2]);//随机抽样文件存放位置
        TotalOrderPartitioner.setPartitionFile(configration, path);
        // partitioner class设置成TotalOrderPartitioner
        job.setPartitionerClass(TotalOrderPartitioner.class);
        // RandomSampler第一个参数表示key会被选中的概率,第二个参数是一个选取samples数,第三个参数是最大读取input splits数  
        RandomSampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 10000, 10);
        // 写partition file到mapreduce.totalorderpartitioner.path
        InputSampler.writePartitionFile(job, sampler);[/mw_shl_code]

使用RandomSampler来做全局排序的时候,有一点比较恶心:要求Map的输入key和输出key的数据类型必须一致?这个太恶心了。我们还得自己重新定义InputFormat实现类。因为我的map输出的key是自定义的数据类型,而Map的输入key是使用的LongWritable类型的。除了自定义InputForMat实现类,还有其他的办法吗?



已有(3)人评论

跳转到指定楼层
javaanddonet 发表于 2018-3-31 16:28:40
有大神指导一下吗?
回复

使用道具 举报

fly2015 发表于 2018-4-2 14:45:22
非要用MR实现吗???
回复

使用道具 举报

javaanddonet 发表于 2018-4-2 17:14:20
fly2015 发表于 2018-4-2 14:45
非要用MR实现吗???

不一定是要用MR。我只不过想深入研究一下。
我这两天研究了,发现这个MR 能写死人的。
转为hive里面的表,然后用HQL很简单了。
再说,好像没有公司用MR了吧。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条