分享

hadoop(大数据)对两列数据进行排序

需求说明:
1、对两列数据进行升序排列
2、先按第一列排序,再对第二列进行排序

案例数据
3        5
299        50
19        1
19        69
2        45
3        500
3        1
400        1
38        3
19        5


程序

Mapper
package com.fish.had.sort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<LongWritable, Text, NewK2Writable, LongWritable>{

        protected NewK2Writable k2 = null;

        @Override
        protected void map(
                        LongWritable k1,
                        Text v1,
                        Mapper<LongWritable, Text, NewK2Writable, LongWritable>.Context context)
                        throws IOException, InterruptedException {

                String[] ss = v1.toString().split("\t");
                k2 = new NewK2Writable(Long.parseLong(ss[0]), Long.parseLong(ss[1]));
                context.write(k2, new LongWritable(Long.parseLong(ss[1])));
        }
}

Reducer
package com.fish.had.sort;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>{

        protected void reduce(
                        NewK2Writable k2,
                        Iterable<LongWritable> v2s,
                        Reducer<NewK2Writable, LongWritable, LongWritable, LongWritable>.Context context)
                        throws IOException, InterruptedException {
                context.write(new LongWritable(k2.first), new LongWritable(k2.second));
        }
}

自定义Writable
package com.fish.had.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class NewK2Writable implements WritableComparable<NewK2Writable>{

        long first;
        long second;

        public NewK2Writable(){}

        public NewK2Writable(long first, long second){
                this.first = first;
                this.second = second;
        }

        public void readFields(DataInput in) throws IOException {
                this.first = in.readLong();
                this.second = in.readLong();
        }

        public void write(DataOutput out) throws IOException {
                out.writeLong(this.first);
                out.writeLong(this.second);
        }

        @Override
        public int compareTo(NewK2Writable o) {
                long minus = this.first - o.first;
                if(minus != 0)
                        return (int) minus;
                return (int) (this.second - o.second);
        }

}


Main
package com.fish.had.sort;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SortMain extends Configured implements Tool{

        public static void main(String[] args) throws Exception {
                if(args.length != 2){
                        System.err.print("----> parameter is two, not null");
                        System.exit(1);
                }
                ToolRunner.run(new SortMain(), args);
        }

        @SuppressWarnings("deprecation")
        public int run(String[] args) throws Exception {
                String INPATH = args[0];
                String OUTPATH = args[1];
                Path outPath = new Path(OUTPATH);

                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(new URI(INPATH), conf);
                if(fs.exists(outPath)) fs.delete(outPath, true);

                Job job = new Job(conf, SortMain.class.getSimpleName());
                job.setJarByClass(SortMain.class);

                job.setInputFormatClass(TextInputFormat.class);
                job.setOutputFormatClass(TextOutputFormat.class);

                job.setMapperClass(SortMapper.class);
                job.setReducerClass(SortReducer.class);

                job.setMapOutputKeyClass(NewK2Writable.class);
                job.setMapOutputValueClass(LongWritable.class);

                job.setOutputKeyClass(LongWritable.class);
                job.setOutputValueClass(LongWritable.class);

                job.setPartitionerClass(HashPartitioner.class);
                job.setNumReduceTasks(1);

                FileInputFormat.setInputPaths(job, new Path(INPATH));
                FileOutputFormat.setOutputPath(job, outPath);

                job.waitForCompletion(true);
                return 0;
        }

}

运行
1.png

查看结果
2.png

已有(1)人评论

跳转到指定楼层
wangzhenqiang 发表于 2015-10-26 11:21:48
学习学习,整点文字说明就好了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条