在本文中,将对Tomcat developer mailing list archives的数据中每个人发送的邮件的回复数量进行统计,因为在原数据中所需要得到的值在不同的行,因此需要自定应inputformat和recordreader。利用自定义的inputformat和recordreader获取到邮件的subject,from,date,然后通过map过程,以subject为key,以from#date为value,最后在reduce过程中,对每个主题(subject)中values按照时间进行排序,最早发送邮件的为发件人,其他的都为回复者,以此统计每个发件人得到的回复数量。在reduce中用到了java中的treemap。具体代码如下:
(数据集可以从
http://mail-archives.apache.org/mod_mbox/tomcat-users/获得)
1、MBoxFileReader.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* Parse each mail line by line from MBox stream
* @author Srinath Perera (hemapani@apache.org)
*/
public class MBoxFileReader extends RecordReader<Text, Text> {
private static Pattern pattern1 = Pattern.compile("From .*tomcat.apache.org@tomcat.apache.org.*");
private BufferedReader reader;
private int count = 0;
private Text key;
private Text value;
private StringBuffer email = new StringBuffer();
String line = null;
public MBoxFileReader() {
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException {
Path path = ((FileSplit) inputSplit).getPath();
FileSystem fs = FileSystem.get(attempt.getConfiguration());
FSDataInputStream fsStream = fs.open(path);
reader = new BufferedReader(new InputStreamReader(fsStream));
while ((line = reader.readLine()) != null) {
Matcher matcher = pattern1.matcher(line);
if (matcher.matches()) {
email.append(line).append("\n");
break;
}
}
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (email == null) {
return false;
}
count++;
while ((line = reader.readLine()) != null) {
Matcher matcher = pattern1.matcher(line);
if (!matcher.matches()) {
email.append(line).append("\n");
} else {
parseEmail(email.toString());
email = new StringBuffer();
email.append(line).append("\n");
return true;
}
}
parseEmail(email.toString());
email = null;
return true;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return count;
}
@Override
public void close() throws IOException {
reader.close();
}
public void parseEmail(String email) {
String[] tokens = email.split("\n");
String from = null;
String subject = null;
String date = null;
for (String token : tokens) {
if (token.contains(":")) {
if (token.startsWith("From:")) {
from = token.substring(5).replaceAll("<.*>|\\\"|,|=[0-9]*", "").replaceAll("\\[.*?\\]", "")
.replaceAll("\\s", "_").trim();
} else if (token.startsWith("Subject:")) {
subject = token.substring(8).trim();
} else if (token.startsWith("Date:")) {
date = token.substring(5).trim();
}
}
}
key = new Text(String.valueOf((from + subject + date).hashCode()));
value = new Text(from + "#" + subject + "#" + date);
}
}
===========================================================================
2、MboxFileFormat.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/**
* Used to read Mbox files
* @author Srinath Perera (hemapani@apache.org)
*/
public class MboxFileFormat extends FileInputFormat<Text, Text>{
private MBoxFileReader boxFileReader = null;
@Override
public RecordReader<Text, Text> createRecordReader(
InputSplit inputSplit, TaskAttemptContext attempt) throws IOException,
InterruptedException {
boxFileReader = new MBoxFileReader();
boxFileReader.initialize(inputSplit, attempt);
return boxFileReader;
}
}
=====================================================================================
3、MLReceiveReplyProcessor.java
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* Find number of owner and replies received by each thread
* @author Srinath Perera (hemapani@apache.org)
*/
public class MLReceiveReplyProcessor {
public static SimpleDateFormat dateFormatter = new SimpleDateFormat("EEEE dd MMM yyyy hh:mm:ss z");
public static class AMapper extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split("#");
String from = tokens[0];
String subject = tokens[1];
String date = tokens[2].replaceAll(",", "");
subject = subject.replaceAll("Re:", "");
context.write(new Text(subject), new Text(date + "#" + from));
}
}
public static class AReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
try {
TreeMap<Long, String> replyData = new TreeMap<Long, String>();
for (Text val : values) {
String[] tokens = val.toString().split("#");
if (tokens.length != 2) {
throw new IOException("Unexpected token " + val.toString());
}
String from = tokens[1];
Date date = dateFormatter.parse(tokens[0]);
replyData.put(date.getTime(), from);
}
String owner = replyData.get(replyData.firstKey());
int replyCount = replyData.size();
int selfReplies = 0;
for (String from : replyData.values()) {
if (owner.equals(from)) {
selfReplies++;
}
}
replyCount = replyCount - selfReplies;
context.write(new Text(owner), new Text(replyCount + "#" + selfReplies));
} catch (Exception e) {
System.out.println("ERROR:" + e.getMessage());
return;
// throw new IOException(e);
}
}
}
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "MLReceiveReplyProcessor");
job.setJarByClass(MLReceiveReplyProcessor.class);
job.setMapperClass(AMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// Uncomment this to
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(AReducer.class);
job.setInputFormatClass(MboxFileFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}