热度 3||
来个实例练习一下:
package com.k3.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 统计各单词数
* @author Administrator
*
*/
public class WordCountCB {
public static void main(String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new Exception("输入参数不正确!");
}
Configuration conf = new Configuration();
Job job = new Job(conf, "WordCountCB");
// 运行主类
job.setJarByClass(WordCountCB.class);
// 指定Mapper
job.setMapperClass(WordCBMapper.class);
// 指定Reducer
job.setReducerClass(WordCBReduce.class);
//指定Combiner 注册
job.setCombinerClass(WordCountCombiner.class);
//或者:直接注册 当reduce和combiner一样的时候,可以直接注册为reduce
//job.setCombinerClass(WordReduce.class);
// 指定Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 指定Reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定输入路径
// 指定输出路径
String input = args[0];
String output = args[1];// 必须不存在
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
// 提交
job.waitForCompletion(true);
}
static class WordCBMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String input = value.toString();
String[] data = input.split(" ");
for (String s : data) {
context.write(new Text(s), new LongWritable(1));
}
}
}
static class WordCBReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> val,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum = 0;
for (LongWritable w : val) {
sum += w.get();
}
context.write(key, new LongWritable(sum));
}
}
/**
* 入参与reduce的入参一致
* @author Administrator
*
*/
static class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> val,
Context context)
throws IOException, InterruptedException {
Long sum = new Long(0);
for (LongWritable v : val) {
sum += v.get();
}
context.write(key, new LongWritable(sum));
}
}
}