立即注册 登录
About云-梭伦科技 返回首页

maizhu的个人空间 https://www.aboutyun.com/?8077 [收藏] [复制] [分享] [RSS]

日志

利用hadoop分析复杂的数据集(Tomcat developer mailing list archives)

已有 1277 次阅读2014-10-29 16:52 |个人分类:HADOOP_MAPRED

在本文中,将对Tomcat developer mailing list archives的数据中每个人发送的邮件的回复数量进行统计,因为在原数据中所需要得到的值在不同的行,因此需要自定应inputformat和recordreader。利用自定义的inputformat和recordreader获取到邮件的subject,from,date,然后通过map过程,以subject为key,以from#date为value,最后在reduce过程中,对每个主题(subject)中values按照时间进行排序,最早发送邮件的为发件人,其他的都为回复者,以此统计每个发件人得到的回复数量。在reduce中用到了java中的treemap。具体代码如下:
(数据集可以从 http://mail-archives.apache.org/mod_mbox/tomcat-users/获得
1、MBoxFileReader.java
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Parse each mail line by line from MBox stream 
 * @author Srinath Perera (hemapani@apache.org)
 */

public class MBoxFileReader extends RecordReader<Text, Text> {
    private static Pattern pattern1 = Pattern.compile("From .*tomcat.apache.org@tomcat.apache.org.*");
    private BufferedReader reader;
    private int count = 0;
    private Text key;
    private Text value;
    private StringBuffer email = new StringBuffer();
    String line = null;

    public MBoxFileReader() {
    }

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext attempt) throws IOException, InterruptedException {
        Path path = ((FileSplit) inputSplit).getPath();

        FileSystem fs = FileSystem.get(attempt.getConfiguration());
        FSDataInputStream fsStream = fs.open(path);
        reader = new BufferedReader(new InputStreamReader(fsStream));

        while ((line = reader.readLine()) != null) {
            Matcher matcher = pattern1.matcher(line);
            if (matcher.matches()) {
                email.append(line).append("\n");
                break;
            }
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (email == null) {
            return false;
        }
        count++;
        while ((line = reader.readLine()) != null) {
            Matcher matcher = pattern1.matcher(line);
            if (!matcher.matches()) {
                email.append(line).append("\n");
            } else {
                parseEmail(email.toString());
                email = new StringBuffer();
                email.append(line).append("\n");
                return true;
            }
        }
        parseEmail(email.toString());
        email = null;
        return true;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return count;
    }

    @Override
    public void close() throws IOException {
        reader.close();
    }

    public void parseEmail(String email) {
        String[] tokens = email.split("\n");
        String from = null;
        String subject = null;
        String date = null;

        for (String token : tokens) {
            if (token.contains(":")) {
                if (token.startsWith("From:")) {
                    from = token.substring(5).replaceAll("<.*>|\\\"|,|=[0-9]*", "").replaceAll("\\[.*?\\]", "")
                            .replaceAll("\\s", "_").trim();
                } else if (token.startsWith("Subject:")) {
                    subject = token.substring(8).trim();
                } else if (token.startsWith("Date:")) {
                    date = token.substring(5).trim();
                }
            }
        }

        key = new Text(String.valueOf((from + subject + date).hashCode()));
        value = new Text(from + "#" + subject + "#" + date);
    }
}

===========================================================================
2、MboxFileFormat.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * Used to read Mbox files 
 * @author Srinath Perera (hemapani@apache.org)
 */
public class MboxFileFormat extends FileInputFormat<Text, Text>{
    private MBoxFileReader boxFileReader = null; 
        
   
    @Override
    public RecordReader<Text, Text> createRecordReader(
            InputSplit inputSplit, TaskAttemptContext attempt) throws IOException,
            InterruptedException {
        boxFileReader = new MBoxFileReader();
        boxFileReader.initialize(inputSplit, attempt);
        return boxFileReader;
    }

}

=====================================================================================
3、MLReceiveReplyProcessor.java

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TreeMap;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * Find number of owner and replies received by each thread 
 * @author Srinath Perera (hemapani@apache.org)
 */
public class MLReceiveReplyProcessor {
    public static SimpleDateFormat dateFormatter = new SimpleDateFormat("EEEE dd MMM yyyy hh:mm:ss z");

    public static class AMapper extends Mapper<Object, Text, Text, Text> {

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] tokens = value.toString().split("#");
            String from = tokens[0];
            String subject = tokens[1];
            String date = tokens[2].replaceAll(",", "");
            subject = subject.replaceAll("Re:", "");
            context.write(new Text(subject), new Text(date + "#" + from));
        }
    }

    public static class AReducer extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            try {
                TreeMap<Long, String> replyData = new TreeMap<Long, String>();
                for (Text val : values) {
                    String[] tokens = val.toString().split("#");
                    if (tokens.length != 2) {
                        throw new IOException("Unexpected token " + val.toString());
                    }
                    String from = tokens[1];
                    Date date = dateFormatter.parse(tokens[0]);
                    replyData.put(date.getTime(), from);
                }
                String owner = replyData.get(replyData.firstKey());
                int replyCount = replyData.size();
                int selfReplies = 0;
                for (String from : replyData.values()) {
                    if (owner.equals(from)) {
                        selfReplies++;
                    }
                }
                replyCount = replyCount - selfReplies;

                context.write(new Text(owner), new Text(replyCount + "#" + selfReplies));
            } catch (Exception e) {
                System.out.println("ERROR:" + e.getMessage());
                return;
                // throw new IOException(e);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: <in> <out>");
            System.exit(2);
        }

        Job job = new Job(conf, "MLReceiveReplyProcessor");
        job.setJarByClass(MLReceiveReplyProcessor.class);
        job.setMapperClass(AMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // Uncomment this to
        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(AReducer.class);
        job.setInputFormatClass(MboxFileFormat.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}


路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条