本帖最后由 小寒co1de 于 2017-10-15 14:01 编辑
补充上完整代码,用的是本地测试模式,没有添加集群信息
[mw_shl_code=java,true]
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import static java.io.FileDescriptor.out;
/**
* Created by hadoop on 2017/9/30 0030.
*/
public class Relative {
public static class MyMap extends Mapper<LongWritable, Text, RelativeKey, IntWritable> {
BufferedWriter bf = null;
protected void setup(Mapper<LongWritable, Text, RelativeKey, IntWritable>.Context context) throws IOException, InterruptedException {
bf = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("mapout")));
}
public void map(LongWritable ikey, Text ivalue, Context context)
throws IOException, InterruptedException {
String[] line = ivalue.toString().split(" ");
for(int i = 1; i<line.length;i++){
for(int j = i+1;j<line.length;j++){
RelativeKey rk= new RelativeKey();
rk.setKeySet(line);
rk.setKeySet(line[j]);
context.write(rk,new IntWritable(1));
String keyset="";
for(String s : rk.getKeySet()){
keyset+=s;
}
bf.write(keyset+1);
bf.newLine();
}
}
// bf.close();
}
public void cleanup(Mapper<LongWritable, Text, RelativeKey, IntWritable>.Context context) throws IOException, InterruptedException {
bf.close();
}
}
public static class MyReduce extends Reducer<RelativeKey, IntWritable, Text, IntWritable> {
public void setup(Reducer<RelativeKey, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
}
public void reduce(RelativeKey _key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 累加值
int count = 0;
for (IntWritable val : values) {
count +=Integer.parseInt(val.toString());
}
//测试收到的key对
System.out.println("keySize is "+_key.getKeySet().size());
String keyset="";
for(String s : _key.getKeySet()){
keyset+=s;
}
System.out.println(keyset);
//将key整理成指定形式
String[] keyString = new String[10]; //本应该是2,但是会数组越界,写成10发现读出来好长一串,就是前文里提到的
int i = 0;
for (String s: _key.getKeySet()) {
keyString[i++] = s;
}
// System.out.println("----------");
String outkey = keyString[0]+"-"+keyString[1];
context.write(new Text(outkey),new IntWritable(count));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(Relative.class);
// TODO: specify a mapper
job.setMapperClass(Relative.MyMap.class);
// TODO: specify a reducer
job.setReducerClass(Relative.MyReduce.class);
job.setMapOutputKeyClass(RelativeKey.class);
job.setMapOutputValueClass(IntWritable.class);
// TODO: specify output types
// job.setOutputKeyClass(Text.class);
// job.setOutputValueClass(Text.class);
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job, new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\input\\input3.txt"));
FileOutputFormat.setOutputPath(job, new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\output4"));
Path path = new Path("C:\\Users\\lenovo\\Desktop\\wordcount\\src\\output4");
FileSystem fileSystem = path.getFileSystem(conf);// 根据path找到这个文件
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);// true的意思是,就算output有东西,也一带删除
}
if (!job.waitForCompletion(true))
return;
}
}
[/mw_shl_code] |