分享

朴素贝叶斯之MapReduce版

fc013 发表于 2016-7-2 19:23:58 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10406
本帖最后由 fc013 于 2016-7-3 10:44 编辑
问题导读:

1.怎样计算类别的先验概率?
2.怎样计算每个词的条件概率?
3.什么是假设二分类问题?




统计词出现的次数
1/计算类别的先验概率

*输入格式:类别+文档id+文档词(切分成A,b,c)

*输出格式:类别+文档出现次数+文档出现的词的总数

2/计算每个词的条件概率

*输入格式:类别+文档id+文档词(切分成A,b,c)

*输出格式:类别+词+词的总数

3/假设二分类问题-计算概率值

* 1类别+文档出现次数+文档出现的词的总数

* 2类别+词+词的总数

* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))

* 输入格式:类别+词+词的总数

* 输出格式:"词","类别+log()值概率"+1,2+类别的先验概率


* 4/假设二分类问题-测试

* 1类别+文档出现次数+文档出现的词的总数

* 2类别+词+词的总数

* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))

*输入格式:新文档id+文档词(切分成A,b,c)

*输出格式:新文档id+类别


这个版本基本写了MapReduce的朴素贝叶斯思路--具体优化和修改以后再弄

Python版实现
R语言版调用函数


Bayes1

[mw_shl_code=java,true]package com.ml.mapreduce;  
  
import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.Reducer.Context;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/**
* 1/计算类别的先验概率
* 汇总到dict1.txt
*输入格式:类别+文档id+文档词(切分成A,b,c)
*输出格式:类别+文档出现次数+文档出现的词的总数
*/  
public class Bayes1 extends Configured implements Tool {  
  
    public static enum Counter {  
        PARSER_ERR  
    }  
  
    public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {  
        private Text mykey = new Text();// 类别id  
        private Text myval = new Text();// 文档id+文档长度  
  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String[] array = value.toString().split(",");  
            String[] doc=array[2].split("-");  
            mykey.set(array[0]);  
            myval.set("1"+","+doc.length);  
            context.write(mykey, myval);  
        };  
    }  
  
    public static class MyReduce extends Reducer<Text, Text, Text, Text> {  
        private Text val = new Text();  
  
        protected void reduce(Text key, Iterable<Text> values, Context context)  
                throws IOException, InterruptedException {  
            // 用于计算该类别总的个数  
            int sum = 0;  
            //计算出现词的总个数  
            int wordsum = 0;  
            // 循环遍历 Interable  
            for (Text value : values) {  
                // 累加  
                String[] array = value.toString().split(",");  
                sum += Integer.parseInt(array[0]);  
                wordsum += Integer.parseInt(array[1]);  
                val.set(sum+","+wordsum);  
            }  
            context.write(key, val);  
        };  
    }  
  
    @Override  
    public int run(String[] args) throws Exception {  
        // 1 conf  
        Configuration conf = new Configuration();  
        conf.set("mapred.textoutputformat.separator", ",");// key value分隔符  
        // 2 create job  
        // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName());  
        Job job = this.parseInputAndOutput(this, conf, args);  
        // 3 set job  
        // 3.1 set run jar class  
        // job.setJarByClass(ModuleReducer.class);  
        // 3.2 set intputformat  
        job.setInputFormatClass(TextInputFormat.class);  
        // 3.3 set input path  
        // FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.4 set mapper  
        job.setMapperClass(MyMap.class);  
        // 3.5 set map output key/value class  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
        // 3.6 set partitioner class  
        // job.setPartitionerClass(HashPartitioner.class);  
        // 3.7 set reduce number  
        // job.setNumReduceTasks(1);  
        // 3.8 set sort comparator class  
        // job.setSortComparatorClass(LongWritable.Comparator.class);  
        // 3.9 set group comparator class  
        // job.setGroupingComparatorClass(LongWritable.Comparator.class);  
        // 3.10 set combiner class  
        // job.setCombinerClass(null);  
        // 3.11 set reducer class  
        job.setReducerClass(MyReduce.class);  
        // 3.12 set output format  
  
        job.setOutputFormatClass(TextOutputFormat.class);  
        // 3.13 job output key/value class  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        // 3.14 set job output path  
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        // 4 submit job  
        boolean isSuccess = job.waitForCompletion(true);  
        // 5 exit  
        // System.exit(isSuccess ? 0 : 1);  
        return isSuccess ? 0 : 1;  
    }  
  
    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)  
            throws Exception {  
        // validate  
        if (args.length != 2) {  
            System.err.printf("Usage:%s [genneric options]<input><output>\n",  
                    tool.getClass().getSimpleName());  
            ToolRunner.printGenericCommandUsage(System.err);  
            return null;  
        }  
        // 2 create job  
        Job job = new Job(conf, tool.getClass().getSimpleName());  
        // 3.1 set run jar class  
        job.setJarByClass(tool.getClass());  
        // 3.3 set input path  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.14 set job output path  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
        return job;  
    }  
  
    public static void main(String[] args) throws Exception {  
        args = new String[] {  
                "hdfs://192.168.192.129:9000/ml/bayesTrain.txt",  
                // "hdfs://hadoop-00:9000/home910/liyuting/output/" };  
                "hdfs://192.168.192.129:9000/ml/bayes/" };  
        // run mapreduce  
        int status = ToolRunner.run(new Bayes1(), args);  
        // 5 exit  
        System.exit(status);  
    }  
}  [/mw_shl_code]

Bayes2

[mw_shl_code=java,true]import java.io.IOException;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.Reducer.Context;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/**
* 2/计算每个词的条件概率
* 汇总到dict2.txt
*输入格式:类别+文档id+文档词(切分成A,b,c)
*输出格式:类别+词+词的总数
*/  
public class Bayes2 extends Configured implements Tool {  
  
    public static enum Counter {  
        PARSER_ERR  
    }  
  
    public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {  
        private Text mykey = new Text();//类别+词  
        private Text myval = new Text();//出现个数  
  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String[] array = value.toString().split(",");  
            String[] doc=array[2].split("-");  
            for (String str : doc) {  
                mykey.set(array[0]+ ","+ str);  
                myval.set("1");  
                context.write(mykey, myval);  
            }  
        };  
    }  
  
    public static class MyReduce extends Reducer<Text, Text, Text, Text> {  
        private Text val = new Text();  
  
        protected void reduce(Text key, Iterable<Text> values, Context context)  
                throws IOException, InterruptedException {  
            // 用于计算每个类别里面每个词出现的总数  
            int sum = 0;  
            // 循环遍历 Interable  
            for (Text value : values) {  
                // 累加  
                String array = value.toString();  
                sum += Integer.parseInt(array);  
                val.set(sum + "");  
            }  
            context.write(key, val);  
        };  
    }  
  
    @Override  
    public int run(String[] args) throws Exception {  
        // 1 conf  
        Configuration conf = new Configuration();  
        conf.set("mapred.textoutputformat.separator", ",");// key value分隔符  
        // 2 create job  
        // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName());  
        Job job = this.parseInputAndOutput(this, conf, args);  
        // 3 set job  
        // 3.1 set run jar class  
        // job.setJarByClass(ModuleReducer.class);  
        // 3.2 set intputformat  
        job.setInputFormatClass(TextInputFormat.class);  
        // 3.3 set input path  
        // FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.4 set mapper  
        job.setMapperClass(MyMap.class);  
        // 3.5 set map output key/value class  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
        // 3.6 set partitioner class  
        // job.setPartitionerClass(HashPartitioner.class);  
        // 3.7 set reduce number  
        // job.setNumReduceTasks(1);  
        // 3.8 set sort comparator class  
        // job.setSortComparatorClass(LongWritable.Comparator.class);  
        // 3.9 set group comparator class  
        // job.setGroupingComparatorClass(LongWritable.Comparator.class);  
        // 3.10 set combiner class  
        // job.setCombinerClass(null);  
        // 3.11 set reducer class  
        job.setReducerClass(MyReduce.class);  
        // 3.12 set output format  
  
        job.setOutputFormatClass(TextOutputFormat.class);  
        // 3.13 job output key/value class  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        // 3.14 set job output path  
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        // 4 submit job  
        boolean isSuccess = job.waitForCompletion(true);  
        // 5 exit  
        // System.exit(isSuccess ? 0 : 1);  
        return isSuccess ? 0 : 1;  
    }  
  
    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)  
            throws Exception {  
        // validate  
        if (args.length != 2) {  
            System.err.printf("Usage:%s [genneric options]<input><output>\n",  
                    tool.getClass().getSimpleName());  
            ToolRunner.printGenericCommandUsage(System.err);  
            return null;  
        }  
        // 2 create job  
        Job job = new Job(conf, tool.getClass().getSimpleName());  
        // 3.1 set run jar class  
        job.setJarByClass(tool.getClass());  
        // 3.3 set input path  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.14 set job output path  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
        return job;  
    }  
  
    public static void main(String[] args) throws Exception {  
        args = new String[] {  
                "hdfs://192.168.192.129:9000/ml/bayesTrain.txt",  
                // "hdfs://hadoop-00:9000/home910/liyuting/output/" };  
                "hdfs://192.168.192.129:9000/ml/bayes/pword/" };  
        // run mapreduce  
        int status = ToolRunner.run(new Bayes2(), args);  
        // 5 exit  
        System.exit(status);  
    }  
}  
[/mw_shl_code]

Bayes3

[mw_shl_code=java,true]package com.ml.mapreduce;  
  
import java.io.BufferedReader;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.filecache.DistributedCache;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.Reducer.Context;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/**
* 3/假设二分类问题-计算概率值  
* 1类别+文档出现次数+文档出现的词的总数  
* 2类别+词+词的总数
* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))
*  
* 输入格式:类别+词+词的总数  
* 输出格式:"词","类别+log()值概率"+1,2+类别的先验概率
*/  
public class Bayes3 extends Configured implements Tool {  
  
    public static enum Counter {  
        PARSER_ERR  
    }  
  
    public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {  
        private Text mykey = new Text();// 类别+词  
        private Text myval = new Text();// 出现个数  
  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
             BufferedReader br = null;  
                //获得当前作业的DistributedCache相关文件  
                Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
                String lines = null;  
                String[] class1 = {"0","0"};  
                String[] class0 = {"0","0"};  
                for(Path p : distributePaths){  
                    if(p.getParent().toString().endsWith("bayes")){  
                        //读缓存文件,并放到mem中  
                        br = new BufferedReader(new FileReader(p.toString()));  
                        while(null!=(lines=br.readLine())){  
                            String[] pall= lines.split(",");  
                            if (pall[0].equals("1")) {  
                                class1[0]=pall[1];  
                                class1[1]=pall[2];  
                            }else {  
                                class0[0]=pall[1];  
                                class0[1]=pall[2];  
                            }  
                        }  
                    }  
                }  
            String[] array = value.toString().split(",");  
            Double plog=0.0;  
            if (array[0].equals("1")) {  
                mykey.set(array[1]);// 词  
                plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class1[1]));  
                myval.set(array[0]+","+plog);// 类别+log概率  
                context.write(mykey, myval);  
            }else {  
                mykey.set(array[1]);// 词  
                plog=Math.log(Double.parseDouble(array[2])/Double.parseDouble(class0[1]));  
                myval.set(array[0]+","+plog);// 类别+log概率  
                context.write(mykey, myval);  
            }  
              
        };  
    }  
  
    public static class MyReduce extends Reducer<Text, Text, Text, Text> {  
        private Text val = new Text();  
  
        protected void reduce(Text key, Iterable<Text> values, Context context)  
                throws IOException, InterruptedException {  
              
            String vals="tab";  
            for (Text value : values) {  
                // 累加  
                vals=vals+","+value.toString();  
            }  
            val.set(vals);  
            context.write(key, val);  
        };  
    }  
  
    @Override  
    public int run(String[] args) throws Exception {  
        // 1 conf  
        Configuration conf = new Configuration();  
        conf.set("mapred.textoutputformat.separator", ",");// key value分隔符  
        DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件  
        // 2 create job  
        // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName());  
        Job job = this.parseInputAndOutput(this, conf, args);  
        // 3 set job  
        // 3.1 set run jar class  
        // job.setJarByClass(ModuleReducer.class);  
        // 3.2 set intputformat  
        job.setInputFormatClass(TextInputFormat.class);  
        // 3.3 set input path  
        // FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.4 set mapper  
        job.setMapperClass(MyMap.class);  
        // 3.5 set map output key/value class  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
        // 3.6 set partitioner class  
        // job.setPartitionerClass(HashPartitioner.class);  
        // 3.7 set reduce number  
//       job.setNumReduceTasks(0);  
        // 3.8 set sort comparator class  
        // job.setSortComparatorClass(LongWritable.Comparator.class);  
        // 3.9 set group comparator class  
        // job.setGroupingComparatorClass(LongWritable.Comparator.class);  
        // 3.10 set combiner class  
        // job.setCombinerClass(null);  
        // 3.11 set reducer class  
        job.setReducerClass(MyReduce.class);  
        // 3.12 set output format  
  
        job.setOutputFormatClass(TextOutputFormat.class);  
        // 3.13 job output key/value class  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        // 3.14 set job output path  
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        // 4 submit job  
        boolean isSuccess = job.waitForCompletion(true);  
        // 5 exit  
        // System.exit(isSuccess ? 0 : 1);  
        return isSuccess ? 0 : 1;  
    }  
  
    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)  
            throws Exception {  
        // validate  
//      if (args.length != 2) {  
//          System.err.printf("Usage:%s [genneric options]<input><output>\n",  
//                  tool.getClass().getSimpleName());  
//          ToolRunner.printGenericCommandUsage(System.err);  
//          return null;  
//      }  
        // 2 create job  
        Job job = new Job(conf, tool.getClass().getSimpleName());  
        // 3.1 set run jar class  
        job.setJarByClass(tool.getClass());  
        // 3.3 set input path  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.14 set job output path  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
        return job;  
    }  
  
    public static void main(String[] args) throws Exception {  
        args = new String[] {  
                "hdfs://192.168.192.129:9000/ml/bayes/pword/part-r-00000",  
                // "hdfs://hadoop-00:9000/home910/liyuting/output/" };  
                "hdfs://192.168.192.129:9000/ml/bayes/pall/",  
                "hdfs://192.168.192.129:9000/ml/bayes/part-r-00000"};  
        // run mapreduce  
        int status = ToolRunner.run(new Bayes3(), args);  
        // 5 exit  
        System.exit(status);  
    }  
}  [/mw_shl_code]

Bayes4

[mw_shl_code=java,true]package com.ml.mapreduce;  
  
import java.io.BufferedReader;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;  
import org.apache.hadoop.filecache.DistributedCache;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.Reducer.Context;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.Tool;  
import org.apache.hadoop.util.ToolRunner;  
  
/**
* 4/假设二分类问题-测试
* 1类别+文档出现次数+文档出现的词的总数
* 2类别+词+词的总数
* 3类别+词+log(词的总数/文档出现的词的总数),类别-log(文档出现次数/sum(文档出现次数))
*  
*输入格式:新文档id+文档词(切分成A,b,c)
*输出格式:新文档id+类别
*/  
public class Bayes4 extends Configured implements Tool {  
  
    public static enum Counter {  
        PARSER_ERR  
    }  
  
    public static class MyMap extends Mapper<LongWritable, Text, Text, Text> {  
        private Text mykey = new Text();//类别+词  
        private Text myval = new Text();//出现个数  
        Map zidianString=new HashMap();//key是词 value是概率值-假设字典可以读到内存中//不能的话切分读取  
        protected void map(LongWritable key, Text value, Context context)  
                throws IOException, InterruptedException {  
             BufferedReader br = null;  
                //获得当前作业的DistributedCache相关文件  
                Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());  
                String lines = null;  
                for(Path p : distributePaths){  
                    if(p.getParent().toString().endsWith("pall")){  
                        //读缓存文件,并放到mem中  
                        br = new BufferedReader(new FileReader(p.toString()));  
                        while(null!=(lines=br.readLine())){  
                            String[] pall= lines.split(",");  
                            if (pall.length>4) {  
                                if (pall[2].equals("1")) {  
                                    zidianString.put(pall[0], pall[2]+","+pall[3]+","+pall[4]+","+pall[5]);  
                                }else {  
                                    zidianString.put(pall[0], pall[4]+","+pall[5]+","+pall[2]+","+pall[3]);  
                                }  
                            }else {  
                                if (pall[2].equals("1")) {  
                                    zidianString.put(pall[0], pall[2]+","+pall[3]+","+"0"+","+"0.0");  
                                }else {  
                                    zidianString.put(pall[0], "1"+","+"0.0"+","+pall[2]+","+pall[3]);  
                                }  
                                 
                            }  
                        }  
                    }  
                }  
            String[] array = value.toString().split(",");  
            String[] doc=array[1].split("-");  
            for (String str : doc) {  
                if (zidianString.containsKey(str)) {  
                    String[] kk=zidianString.get(str).toString().split(",");//类别+概率  
                    mykey.set(array[0]);//文档id  
                    myval.set(kk[0]+","+kk[1]+","+kk[2]+","+kk[3]);//类别+log概率  
                    context.write(mykey, myval);  
                }  
            }  
        };  
    }  
  
    public static class MyReduce extends Reducer<Text, Text, Text, Text> {  
        private Text val = new Text();  
  
        protected void reduce(Text key, Iterable<Text> values, Context context)  
                throws IOException, InterruptedException {  
            // 用于计算每个类别里面词的概率  
            Double sum=0.5;//类别1的先验概率 --需要提前算好0-0这里可以考虑读入--等有空再修改  
            Double sum2=0.5;//类别0的先验概率   
            // 循环遍历 Interable  
            for (Text value : values) {  
                // 累加  
                String[] array = value.toString().split(",");  
                    sum += Double.parseDouble(array[1]);//似然概率  
                    sum2 += Double.parseDouble(array[3]);//似然概率  
            }  
            if (sum>sum2) {  
                val.set("类别1");  
            }else {  
                val.set("类别0");  
            }  
            context.write(key, val);  
        };  
    }  
  
    @Override  
    public int run(String[] args) throws Exception {  
        // 1 conf  
        Configuration conf = new Configuration();  
        conf.set("mapred.textoutputformat.separator", ",");// key value分隔符  
        DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);//为该job添加缓存文件  
        // 2 create job  
        // Job job = new Job(conf, ModuleMapReduce.class.getSimpleName());  
        Job job = this.parseInputAndOutput(this, conf, args);  
        // 3 set job  
        // 3.1 set run jar class  
        // job.setJarByClass(ModuleReducer.class);  
        // 3.2 set intputformat  
        job.setInputFormatClass(TextInputFormat.class);  
        // 3.3 set input path  
        // FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.4 set mapper  
        job.setMapperClass(MyMap.class);  
        // 3.5 set map output key/value class  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
        // 3.6 set partitioner class  
        // job.setPartitionerClass(HashPartitioner.class);  
        // 3.7 set reduce number  
//       job.setNumReduceTasks(0);  
        // 3.8 set sort comparator class  
        // job.setSortComparatorClass(LongWritable.Comparator.class);  
        // 3.9 set group comparator class  
        // job.setGroupingComparatorClass(LongWritable.Comparator.class);  
        // 3.10 set combiner class  
        // job.setCombinerClass(null);  
        // 3.11 set reducer class  
        job.setReducerClass(MyReduce.class);  
        // 3.12 set output format  
  
        job.setOutputFormatClass(TextOutputFormat.class);  
        // 3.13 job output key/value class  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        // 3.14 set job output path  
        // FileOutputFormat.setOutputPath(job, new Path(args[1]));  
        // 4 submit job  
        boolean isSuccess = job.waitForCompletion(true);  
        // 5 exit  
        // System.exit(isSuccess ? 0 : 1);  
        return isSuccess ? 0 : 1;  
    }  
  
    public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args)  
            throws Exception {  
        // validate  
//      if (args.length != 2) {  
//          System.err.printf("Usage:%s [genneric options]<input><output>\n",  
//                  tool.getClass().getSimpleName());  
//          ToolRunner.printGenericCommandUsage(System.err);  
//          return null;  
//      }  
        // 2 create job  
        Job job = new Job(conf, tool.getClass().getSimpleName());  
        // 3.1 set run jar class  
        job.setJarByClass(tool.getClass());  
        // 3.3 set input path  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        // 3.14 set job output path  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  
        return job;  
    }  
  
    public static void main(String[] args) throws Exception {  
        args = new String[] {  
                "hdfs://192.168.192.129:9000/ml/test.txt",  
                // "hdfs://hadoop-00:9000/home910/liyuting/output/" };  
                "hdfs://192.168.192.129:9000/ml/bayes/result/",  
                "hdfs://192.168.192.129:9000/ml/bayes/pall/part-r-00000"};  
        // run mapreduce  
        int status = ToolRunner.run(new Bayes4(), args);  
        // 5 exit  
        System.exit(status);  
    }  
}  
[/mw_shl_code]


来自:余音丶未散


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条