分享

二次排序SecondarySort 例子出错

求大神指教一下,为什么有错误

package org.apache.hadoop.examples;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;



import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class SecondarySortEX {
        

        public static class IntPair implements WritableComparable<IntPair>{
                int first =0;
                int second=0;
               
               
                public void set(int left,int right){
                        
                        first =left;
                        second=right;
                        
                }
                public int getFirst(){
                        return first;
                        
                }
                public int getSecond(){
                        return second;
                        
                }
                @Override
                public void readFields(DataInput in) throws IOException {
                        first=in.readInt();
                        second=in.readInt();
                        
                        
                }
                @Override
                public void write(DataOutput out) throws IOException {
                        out.writeInt(first);
                        out.write(second);
                }

                        public int compareTo(IntPair o){
                                if (first!=o.first)
                                {
                                        return first<o.first?-1:1;
                                
                                }
                                else if(second!=o.second)
                                {
                                        return second<o.second?-1:1;
                                }
                                else
                                {
                                        return 0;
                                       
                                }
                        }
                                
               
                @Override
                public int hashCode() {
                        return first*157+second;
                        

                }
                public boolean equals(Object right){
               
                        if(right==null)
                                return false;
                        if(this==right)
                                return true;
                        
                        if(right instanceof IntPair){
                                IntPair r=(IntPair)right;
                                return r.first==first&&r.second==second;
                        }
                        else{
                                return false;
                        }
                }
        }
        
        
        public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
               
                public int getPartition(IntPair key,IntWritable value,int numPartitions)
                {
                        return Math.abs(key.getFirst()*127) % numPartitions;
                }
        }

               
        public static class GroupingComparator implements RawComparator<IntPair>{

                @Override
                public int compare(IntPair o1, IntPair o2) {
                        int first1=o1.getFirst();
                        int first2=o2.getFirst();
                        return first1==first2?0:(first1<first2?-1:1);
                        
                }

                @Override
                public int compare(byte[] b1, int s1, int l1, byte[] b2,
                                int s2, int l2) {
                        
                                return WritableComparator.compareBytes(b1, s1,Integer.SIZE/8, b2, s2, Integer.SIZE/8);
                }
               
               
               
        }

        public static class MapClass extends Mapper<LongWritable,Text,IntPair,IntWritable>{
               
                private final IntPair inkey=new IntPair();
                private final IntWritable invalue=new IntWritable();
                @Override
                protected void map(LongWritable Key, Text Value, Context context)
                                throws IOException, InterruptedException {
                        
                        StringTokenizer itr=new StringTokenizer(invalue.toString());
                        int left=0;
                        int right=0;
                        if(itr.hasMoreTokens()){
                                left=Integer.parseInt(itr.nextToken());
                                if(itr.hasMoreTokens()){
                                        right=Integer.parseInt(itr.nextToken());
                                       
                                }
                                inkey.set(left, right);
                                invalue.set(right);
                                context.write(inkey, invalue);
                                
                        }
                        
                }

                public static class ReduceClass extends Reducer<IntPair,IntWritable,Text,IntWritable>{
                        
                        private final Text SEPARATOR=new Text("--------------------------------------");
                        
                        private final Text first=new Text();

                        @Override
                        protected void reduce(IntPair key, Iterable<IntWritable> values,
                                Context context)
                                        throws IOException, InterruptedException {
                                context.write(SEPARATOR, null);
                                first.set(Integer.toString(key.getFirst()));
                                for(IntWritable value:values){
                                        context.write(first, value);
                                       
                                }

                        }
                        
                        
                }
               

                        public static void main(String[] args) throws IOException,
                        ClassNotFoundException, InterruptedException {
                System.out.println("Begin");
                Configuration conf = new Configuration();
                String[] otherArgs = new GenericOptionsParser(conf, args)
                                .getRemainingArgs();
                if (otherArgs.length < 2) {
                        System.out.println("please input at least 2 arguments");
                        System.exit(2);
                }

                Job job = new Job(conf, "SecondarySort Score");
                job.setJarByClass(SecondarySortEX.class);
                job.setMapperClass(MapClass.class);
               
                job.setPartitionerClass(FirstPartitioner.class);
                job.setGroupingComparatorClass(GroupingComparator.class);
               
                job.setReducerClass(ReduceClass.class);
               
                job.setMapOutputKeyClass(IntPair.class);
                job.setMapOutputValueClass(IntWritable.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
               
                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);

        

                FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
                FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);

                System.out.println("End");
                        
                        
                }
               
        }

}

错误·

错误·


已有(1)人评论

跳转到指定楼层
when30 发表于 2016-3-24 11:51:50
public static class MapClass extends Mapper<LongWritable,Text,IntPair,IntWritable>{

                private final IntPair inkey=new IntPair();
                private final IntWritable invalue=new IntWritable();
                @Override
                protected void map(LongWritable Key, Text Value, Context context)
                                throws IOException, InterruptedException {

                        StringTokenizer itr=new StringTokenizer(invalue.toString());
                        int left=0;
                        int right=0;
                        if(itr.hasMoreTokens()){
                                left=Integer.parseInt(itr.nextToken());
                                if(itr.hasMoreTokens()){
                                        right=Integer.parseInt(itr.nextToken());

                                }
                                inkey.set(left, right);
                                invalue.set(right);
                                context.write(inkey, invalue);

                        }

                }

                public static class ReduceClass extends Reducer<IntPair,IntWritable,Text,IntWritable>{

                        private final Text SEPARATOR=new Text("--------------------------------------");
                        
                        private final Text first=new Text();

                        @Override
                        protected void reduce(IntPair key, Iterable<IntWritable> values,
                                Context context)
                                        throws IOException, InterruptedException {
                                context.write(SEPARATOR, null);
                                first.set(Integer.toString(key.getFirst()));
                                for(IntWritable value:values){
                                        context.write(first, value);

                                }

                        }


                }



楼主上面放到函数的含义是什么?
能否放到里面

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条