分享

hadoop2.2的mapreduce问题

有妳很满促 发表于 2013-12-16 12:43:14 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 11 11328
抢楼 本帖为抢楼帖,欢迎抢楼! 
public class WordCountTx {
    public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

       @Override
       public void map(LongWritable key, Text value,
                     OutputCollector<Text, IntWritable> output,
            Reporter reporter) throws IOException {
              String line = value.toString();
              StringTokenizer itr = new StringTokenizer(line);
              while (itr.hasMoreTokens()) {
                     word.set(itr.nextToken());
                     output.collect(word, one);
              }
  }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

       @Override
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output,
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {

         JobConf conf = new JobConf(WordCountTx.class);
         // hdfs地址
         conf.set("fs.default.name", "hdfs://192.168.1.238:9000");
         conf.set("mapred.job.tracker","hdfs://192.168.1.238:9009");
         // 设置jar
         File jarFile = EightSpaceJob.createTempJar("bin");

         EightSpaceJob.addClasspath("/usr/hadoop/etc/hadoop");

         ClassLoader classLoader = EightSpaceJob.getClassLoader();

         Thread.currentThread().setContextClassLoader(classLoader);

         conf.setJar(jarFile.toString());

         conf.setJobName("wordcounttxt");

         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);

         conf.setMapperClass(MapClass.class);
         conf.setReducerClass(Reduce.class);

         FileInputFormat.setInputPaths(conf, new Path("/inpath/txttest"));
         FileOutputFormat.setOutputPath(conf, new Path("/logAalyse"));

         JobClient.runJob(conf);   
  }   
}
上边的代码是我写的,大家看看有什么问题,然后运行的时候报错如下
log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration.deprecation).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
        at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
        at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
        at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
        at org.apache.hadoop.mapred.JobClient.init(JobClient.java:470)
        at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:449)
        at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:832)
        at com.eightsapce.mapreducetxt.WordCountTx.main(WordCountTx.java:96)


已有(11)人评论

跳转到指定楼层
有妳很满促 发表于 2013-12-16 12:50:08

hadoop2.2的mapreduce的使用

public class WordCountTx {
    public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

       @Override
       public void map(LongWritable key, Text value,
                     OutputCollector<Text, IntWritable> output,
            Reporter reporter) throws IOException {
              String line = value.toString();
              StringTokenizer itr = new StringTokenizer(line);
              while (itr.hasMoreTokens()) {
                     word.set(itr.nextToken());
                     output.collect(word, one);
              }
  }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {

       @Override
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output,
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {

         JobConf conf = new JobConf(WordCountTx.class);
         // hdfs地址
         conf.set("fs.default.name", "hdfs://192.168.1.238:9000");
         conf.set("mapred.job.tracker","hdfs://192.168.1.238:9009");
         // 设置jar
         File jarFile = EightSpaceJob.createTempJar("bin");

         EightSpaceJob.addClasspath("/usr/hadoop/etc/hadoop");

         ClassLoader classLoader = EightSpaceJob.getClassLoader();

         Thread.currentThread().setContextClassLoader(classLoader);

         conf.setJar(jarFile.toString());

         conf.setJobName("wordcounttxt");

         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);

         conf.setMapperClass(MapClass.class);
         conf.setReducerClass(Reduce.class);

         FileInputFormat.setInputPaths(conf, new Path("/inpath/txttest"));
         FileOutputFormat.setOutputPath(conf, new Path("/logAalyse"));

         JobClient.runJob(conf);   
  }   
}
上边的代码是我写的,大家看看有什么问题,然后运行的时候报错如下
log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration.deprecation).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.io.IOException: Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
        at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
        at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
        at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
        at org.apache.hadoop.mapred.JobClient.init(JobClient.java:470)
        at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:449)
        at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:832)
        at com.eightsapce.mapreducetxt.WordCountTx.main(WordCountTx.java:96)

回复

使用道具 举报

lzw 发表于 2013-12-16 14:07:38

你是在eclipse中运行的还是用hadoop jar命令执行的。
回复

使用道具 举报

lzw 发表于 2013-12-16 14:19:24
我发现这个代码也不是haoop2.2.0的吧,应该是1.0之前的版本。
回复

使用道具 举报

有妳很满促 发表于 2013-12-17 12:05:50
lzw 发表于 2013-12-16 14:07
你是在eclipse中运行的还是用hadoop jar命令执行的。

在eclipse中运行
回复

使用道具 举报

有妳很满促 发表于 2013-12-17 12:06:38
lzw 发表于 2013-12-16 14:19
我发现这个代码也不是haoop2.2.0的吧,应该是1.0之前的版本。

我看网上说2.x和1.x代码一样啊
回复

使用道具 举报

lzw 发表于 2013-12-17 12:44:17
有妳很满促 发表于 2013-12-17 12:06
我看网上说2.x和1.x代码一样啊

上面举的例子既不是1.0.x,也不是2.0.x版本的。
回复

使用道具 举报

有妳很满促 发表于 2013-12-18 12:05:09
lzw 发表于 2013-12-17 12:44
上面举的例子既不是1.0.x,也不是2.0.x版本的。

那你有那个2.x的例子不
回复

使用道具 举报

lzw 发表于 2013-12-18 21:15:16
有妳很满促 发表于 2013-12-18 12:05
那你有那个2.x的例子不

新建3个类文件,代码依次如下:Mapper 类:MaxTemperatureMapper.java
  1. importjava.io.IOException;
  2. importorg.apache.hadoop.io.IntWritable;
  3. importorg.apache.hadoop.io.LongWritable;
  4. importorg.apache.hadoop.io.Text;
  5. importorg.apache.hadoop.mapreduce.Mapper;
  6. public class MaxTemperatureMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
  7.          @Override
  8.          public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{                                    
  9.                    String line =value.toString();                              
  10.                    try {
  11.                             String year =line.substring(0,4);
  12.                             int airTemperature = Integer.parseInt(line.substring(5));            
  13.                             context.write(new Text(year),new IntWritable(airTemperature));                           
  14.                    } catch (Exception e) {
  15.                             System.out.print("Error in line:" + line);
  16.                    }                                 
  17.          }        
  18. }
复制代码

Reducer类:MaxTemperatureReducer.java
  1. importjava.io.IOException;
  2. importorg.apache.hadoop.io.IntWritable;
  3. importorg.apache.hadoop.io.Text;
  4. importorg.apache.hadoop.mapreduce.Reducer;
  5. public class MaxTemperatureReducer extendsReducer<Text,IntWritable,Text,IntWritable> {        
  6.          @Override
  7.          public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException        {
  8.                    int maxValue = Integer.MIN_VALUE;                  
  9.                    for(IntWritable value: values){
  10.                             maxValue = Math.max(maxValue,value.get());               
  11.                    }        
  12.                    context.write(key, new IntWritable(maxValue));                 
  13.          }
  14. }
复制代码


主类:MaxTemperatureDriver.java

  1. importorg.apache.hadoop.conf.Configuration;
  2. importorg.apache.hadoop.conf.Configured;
  3. importorg.apache.hadoop.fs.Path;
  4. importorg.apache.hadoop.io.IntWritable;
  5. importorg.apache.hadoop.io.Text;
  6. importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. importorg.apache.hadoop.mapreduce.Job;
  9. importorg.apache.hadoop.util.Tool;
  10. importorg.apache.hadoop.util.ToolRunner;
  11. public class MaxTemperatureDriver extends Configuredimplements Tool {
  12.          @SuppressWarnings("deprecation")
  13.          @Override
  14.          public int run(String[] args) throwsException {                  
  15.                    if (args.length != 2){
  16.                             System.err.printf("Usage: %s <input><output>",getClass().getSimpleName());
  17.                             ToolRunner.printGenericCommandUsage(System.err);
  18.                             return -1;                  
  19.                    }                  
  20.                    Configuration conf =getConf();               
  21.                    Job job = newJob(getConf());
  22.                    job.setJobName("Max Temperature");                  
  23.                    job.setJarByClass(getClass());
  24.                    FileInputFormat.addInputPath(job,new Path(args[0]));
  25.                    FileOutputFormat.setOutputPath(job,new Path(args[1]));                  
  26.                    job.setMapperClass(MaxTemperatureMapper.class);
  27.                    job.setReducerClass(MaxTemperatureReducer.class);                  
  28.                    job.setOutputKeyClass(Text.class);
  29.                    job.setOutputValueClass(IntWritable.class);                  
  30.                    return job.waitForCompletion(true)?0:1;                  
  31.          }
  32.         
  33.          public static void main(String[] args)throws Exception{
  34.                    int exitcode = ToolRunner.run(new MaxTemperatureDriver(), args);
  35.                    System.exit(exitcode);                  
  36.          }   
  37. }
复制代码

回复

使用道具 举报

有妳很满促 发表于 2013-12-27 11:26:35
lzw 发表于 2013-12-18 21:15
新建3个类文件,代码依次如下:Mapper 类:MaxTemperatureMapper.java

Reducer类:MaxTemperatureRedu ...

谢了,有时间试试
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条