分享

Mapreduce程序实例并部署在Hadoop2.2.0上运行

w123aw 发表于 2014-1-12 01:15:55 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 60001
本帖最后由 pig2 于 2014-2-20 00:18 编辑

在Hadoop2.2.0伪分布式上面运行我们写好的Mapreduce程序。先给出这个程序所依赖的Maven包:

  1. <dependencies>
  2.     <dependency>
  3.         <groupId>org.apache.hadoop</groupId>
  4.         <artifactId>hadoop-mapreduce-client-core</artifactId>
  5.         <version>2.1.1-beta</version>
  6.     </dependency>
  7.     <dependency>
  8.         <groupId>org.apache.hadoop</groupId>
  9.         <artifactId>hadoop-common</artifactId>
  10.         <version>2.1.1-beta</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>org.apache.hadoop</groupId>
  14.         <artifactId>hadoop-mapreduce-client-common</artifactId>
  15.         <version>2.1.1-beta</version>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.apache.hadoop</groupId>
  19.         <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  20.         <version>2.1.1-beta</version>
  21.     </dependency>
  22. </dependencies>
复制代码
记得加上

  1. <dependency>
  2.         <groupId>org.apache.hadoop</groupId>
  3.         <artifactId>hadoop-mapreduce-client-common</artifactId>
  4.         <version>2.1.1-beta</version>
  5. </dependency>
  6. <dependency>
  7.         <groupId>org.apache.hadoop</groupId>
  8.         <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  9.         <version>2.1.1-beta</version>
  10. </dependency>
复制代码
否则运行程序的时候将会出现一下的异常:

  1. Exception in thread "main" java.io.IOException: Cannot initialize Cluster.
  2.     Please check your configuration for mapreduce.framework.name and the
  3.     correspond server addresses.
  4.         at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:120)
  5.         at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
  6.         at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
  7.         at org.apache.hadoop.mapred.JobClient.init(JobClient.java:465)
  8.         at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:444)
  9.         at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:826)
  10.         at com.wyp.hadoop.MaxTemperature.main(MaxTemperature.java:41)
  11.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  12.         at sun.reflect.NativeMethodAccessorImpl.invoke
  13.                            (NativeMethodAccessorImpl.java:57)
  14.         at sun.reflect.DelegatingMethodAccessorImpl.invoke
  15.                            (DelegatingMethodAccessorImpl.java:43)
  16.         at java.lang.reflect.Method.invoke(Method.java:606)
  17.         at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
复制代码
好了,现在给出程序,代码如下:

  1. package com.wyp.hadoop;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapred.*;
  6. import java.io.IOException;
  7. /**
  8. * User: wyp
  9. * Date: 13-10-25
  10. * Time: 下午3:26
  11. * Email:wyphao.2007@163.com
  12. */
  13. public class MaxTemperatureMapper extends MapReduceBase
  14.                       implements Mapper<LongWritable, Text,
  15.                       Text,IntWritable>{
  16.     private static final int MISSING = 9999;
  17.     @Override
  18.     public void map(LongWritable key, Text value,
  19.                       OutputCollector<Text, IntWritable> output,
  20.                       Reporter reporter) throws IOException {
  21.         String line = value.toString();
  22.         String year = line.substring(15, 19);
  23.         int airTemperature;
  24.         if(line.charAt(87) == '+'){
  25.             airTemperature = Integer.parseInt(line.substring(88, 92));
  26.         }else{
  27.             airTemperature = Integer.parseInt(line.substring(87, 92));
  28.         }
  29.         String quality = line.substring(92, 93);
  30.         if(airTemperature != MISSING && quality.matches("[01459]")){
  31.             output.collect(new Text(year), new IntWritable(airTemperature));
  32.         }
  33.     }
  34. }
  35. package com.wyp.hadoop;
  36. import org.apache.hadoop.io.IntWritable;
  37. import org.apache.hadoop.io.Text;
  38. import org.apache.hadoop.mapred.MapReduceBase;
  39. import org.apache.hadoop.mapred.OutputCollector;
  40. import org.apache.hadoop.mapred.Reducer;
  41. import org.apache.hadoop.mapred.Reporter;
  42. import java.io.IOException;
  43. import java.util.Iterator;
  44. /**
  45. * User: wyp
  46. * Date: 13-10-25
  47. * Time: 下午3:36
  48. * Email:wyphao.2007@163.com
  49. */
  50. public class MaxTemperatureReducer extends MapReduceBase
  51.                     implements Reducer<Text, IntWritable,
  52.                     Text, IntWritable> {
  53.     @Override
  54.     public void reduce(Text key, Iterator<IntWritable> values,
  55.                     OutputCollector<Text, IntWritable> output,
  56.                     Reporter reporter) throws IOException {
  57.         int maxValue = Integer.MIN_VALUE;
  58.         while (values.hasNext()){
  59.             maxValue = Math.max(maxValue, values.next().get());
  60.         }
  61.         output.collect(key, new IntWritable(maxValue));
  62.     }
  63. }
  64. package com.wyp.hadoop;
  65. import org.apache.hadoop.fs.Path;
  66. import org.apache.hadoop.io.IntWritable;
  67. import org.apache.hadoop.io.Text;
  68. import org.apache.hadoop.mapred.FileInputFormat;
  69. import org.apache.hadoop.mapred.FileOutputFormat;
  70. import org.apache.hadoop.mapred.JobClient;
  71. import org.apache.hadoop.mapred.JobConf;
  72. import java.io.IOException;
  73. /**
  74. * User: wyp
  75. * Date: 13-10-25
  76. * Time: 下午3:40
  77. * Email:wyphao.2007@163.com
  78. */
  79. public class MaxTemperature {
  80.     public static void main(String[] args) throws IOException {
  81.         if(args.length != 2){
  82.             System.err.println("Error!");
  83.             System.exit(1);
  84.         }
  85.         JobConf conf = new JobConf(MaxTemperature.class);
  86.         conf.setJobName("Max Temperature");
  87.         FileInputFormat.addInputPath(conf, new Path(args[0]));
  88.         FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  89.         conf.setMapperClass(MaxTemperatureMapper.class);
  90.         conf.setReducerClass(MaxTemperatureReducer.class);
  91.         conf.setOutputKeyClass(Text.class);
  92.         conf.setOutputValueClass(IntWritable.class);
  93.         JobClient.runJob(conf);
  94.     }
  95. }
复制代码
 将上面的程序编译和打包成jar文件,然后开始在Hadoop2.2.0(本文假定用户都部署好了Hadoop2.2.0)上面部署了。下面主要讲讲如何去部署:

  首先,启动Hadoop2.2.0,命令如下:

  1. [wyp@wyp hadoop]$ sbin/start-dfs.sh
  2. [wyp@wyp hadoop]$ sbin/start-yarn.sh
复制代码
 如果你想看看Hadoop2.2.0是否运行成功,运行下面的命令去查看

  1. [wyp@wyp hadoop]$ jps
  2. 9582 Main
  3. 9684 RemoteMavenServer
  4. 16082 Jps
  5. 7011 DataNode
  6. 7412 ResourceManager
  7. 7528 NodeManager
  8. 7222 SecondaryNameNode
  9. 6832 NameNode
复制代码
 其中jps是jdk自带的一个命令,在jdk/bin目录下。如果你电脑上面出现了以上的几个进程(NameNode、SecondaryNameNode、NodeManager、ResourceManager、DataNode这五个进程必须出现!)说明你的Hadoop服务器启动成功了!现在来运行上面打包好的jar文件(这里为Hadoop.jar,其中/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jar是它的绝对路径,不知道绝对路径是什么?那你好好去学学吧!),运行下面的命令:

  1. [wyp@wyp Hadoop_jar]$ /home/wyp/Downloads/hadoop/bin/hadoop jar \
  2.            /home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jar  \
  3.            com/wyp/hadoop/MaxTemperature \
  4.            /user/wyp/data.txt \
  5.            /user/wyp/result
复制代码
(上面是一条命令,由于太长了,所以我分行写,在实际情况中,请写一行!)其中,/home/wyp/Downloads/hadoop/bin/hadoop是hadoop的绝对路径,如果你在环境变量中配置好hadoop命令的路径就不需要这样写;com/wyp/hadoop/MaxTemperature是上面程序的main函数的入口;/user/wyp/data.txt是Hadoop文件系统(HDFS)中的绝对路径(注意:这里不是你Linux系统中的绝对路径!),为需要分析文件的路径(也就是input);/user/wyp/result是分析结果输出的绝对路径(注意:这里不是你Linux系统中的绝对路径!而是HDFS上面的路径!而且/user/wyp/result一定不能存在,否则会抛出异常!这是Hadoop的保护机制,你总不想你以前运行好几天的程序突然被你不小心给覆盖掉了吧?所以,如果/user/wyp/result存在,程序会抛出异常,很不错啊)。好了。输入上面的命令,应该会得到下面类似的输出:

  1. 13/10/28 15:20:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
  2. 13/10/28 15:20:44 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
  3. 13/10/28 15:20:45 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  4. 13/10/28 15:20:45 WARN mapreduce.JobSubmitter: No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
  5. 13/10/28 15:20:45 INFO mapred.FileInputFormat: Total input paths to process : 1
  6. 13/10/28 15:20:46 INFO mapreduce.JobSubmitter: number of splits:2
  7. 13/10/28 15:20:46 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
  8. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
  9. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
  10. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
  11. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
  12. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
  13. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
  14. 13/10/28 15:20:46 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
  15. 13/10/28 15:20:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1382942307976_0008
  16. 13/10/28 15:20:47 INFO mapred.YARNRunner: Job jar is not present. Not adding any jar to the list of resources.
  17. 13/10/28 15:20:49 INFO impl.YarnClientImpl: Submitted application application_1382942307976_0008 to ResourceManager at /0.0.0.0:8032
  18. 13/10/28 15:20:49 INFO mapreduce.Job: The url to track the job: http://wyp:8088/proxy/application_1382942307976_0008/
  19. 13/10/28 15:20:49 INFO mapreduce.Job: Running job: job_1382942307976_0008
  20. 13/10/28 15:20:59 INFO mapreduce.Job: Job job_1382942307976_0008 running in uber mode : false
  21. 13/10/28 15:20:59 INFO mapreduce.Job:  map 0% reduce 0%
  22. 13/10/28 15:21:35 INFO mapreduce.Job:  map 100% reduce 0%
  23. 13/10/28 15:21:38 INFO mapreduce.Job:  map 0% reduce 0%
  24. 13/10/28 15:21:38 INFO mapreduce.Job: Task Id : attempt_1382942307976_0008_m_000000_0, Status : FAILED
  25. Error: java.lang.RuntimeException: Error in configuring object
  26.         at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
  27.         at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
  28.         at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
  29.         at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:425)
  30.         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
  31.         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
  32.         at java.security.AccessController.doPrivileged(Native Method)
  33.         at javax.security.auth.Subject.doAs(Subject.java:415)
  34.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
  35.         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
  36. Caused by: java.lang.reflect.InvocationTargetException
  37.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  38.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  39.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  40.         at java.lang.reflect.Method.invoke(Method.java:606)
  41.         at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
  42.         ... 9 more
  43. Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.wyp.hadoop.MaxTemperatureMapper1 not found
  44.         at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1752)
  45.         at org.apache.hadoop.mapred.JobConf.getMapperClass(JobConf.java:1058)
  46.         at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:38)
  47.         ... 14 more
  48. Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.wyp.hadoop.MaxTemperatureMapper1 not found
  49.         at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1720)
  50.         at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1744)
  51.         ... 16 more
  52. Caused by: java.lang.ClassNotFoundException: Class com.wyp.hadoop.MaxTemperatureMapper1 not found
  53.         at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)
  54.         at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1718)
  55.         ... 17 more
  56. Container killed by the ApplicationMaster.
  57. Container killed on request. Exit code is 143
复制代码
程序居然抛出异常(ClassNotFoundException)

经个人总结,这通常是由于以下几种原因造成的:
(1)你编写了一个java lib,封装成了jar,然后再写了一个Hadoop程序,调用这个jar完成mapper和reducer的编写
(2)你编写了一个Hadoop程序,期间调用了一个第三方java lib。
之后,你将自己的jar包或者第三方java包分发到各个TaskTracker的HADOOP_HOME目录下,运行你的JAVA程序,报了以上错误。
那怎么解决呢?一个笨重的方法是,在运行Hadoop作业的时候,先运行下面的命令:
  1. [wyp@wyp Hadoop_jar]$ export \
  2.     HADOOP_CLASSPATH=/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/
复制代码
其中,/home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/是上面Hadoop.jar文件所在的目录。好了,现在再运行一下Hadoop作业命令:

有一个比较推荐的方法,就是在提交作业的时候加上-libjars参数,后面跟着需要的类库的绝对路径。
  1. [wyp@wyp Hadoop_jar]$ hadoop jar /home/wyp/IdeaProjects/Hadoop/out/artifacts/Hadoop_jar/Hadoop.jar  com/wyp/hadoop/MaxTemperature /user/wyp/data.txt /user/wyp/result
  2. 13/10/28 15:34:16 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
  3. 13/10/28 15:34:16 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
  4. 13/10/28 15:34:17 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  5. 13/10/28 15:34:17 INFO mapred.FileInputFormat: Total input paths to process : 1
  6. 13/10/28 15:34:17 INFO mapreduce.JobSubmitter: number of splits:2
  7. 13/10/28 15:34:17 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name
  8. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
  9. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
  10. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
  11. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
  12. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
  13. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
  14. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
  15. 13/10/28 15:34:17 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
  16. 13/10/28 15:34:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1382942307976_0009
  17. 13/10/28 15:34:18 INFO impl.YarnClientImpl: Submitted application application_1382942307976_0009 to ResourceManager at /0.0.0.0:8032
  18. 13/10/28 15:34:18 INFO mapreduce.Job: The url to track the job: http://wyp:8088/proxy/application_1382942307976_0009/
  19. 13/10/28 15:34:18 INFO mapreduce.Job: Running job: job_1382942307976_0009
  20. 13/10/28 15:34:26 INFO mapreduce.Job: Job job_1382942307976_0009 running in uber mode : false
  21. 13/10/28 15:34:26 INFO mapreduce.Job:  map 0% reduce 0%
  22. 13/10/28 15:34:41 INFO mapreduce.Job:  map 50% reduce 0%
  23. 13/10/28 15:34:53 INFO mapreduce.Job:  map 100% reduce 0%
  24. 13/10/28 15:35:17 INFO mapreduce.Job:  map 100% reduce 100%
  25. 13/10/28 15:35:18 INFO mapreduce.Job: Job job_1382942307976_0009 completed successfully
  26. 13/10/28 15:35:18 INFO mapreduce.Job: Counters: 43
  27.         File System Counters
  28.                 FILE: Number of bytes read=144425
  29.                 FILE: Number of bytes written=524725
  30.                 FILE: Number of read operations=0
  31.                 FILE: Number of large read operations=0
  32.                 FILE: Number of write operations=0
  33.                 HDFS: Number of bytes read=1777598
  34.                 HDFS: Number of bytes written=18
  35.                 HDFS: Number of read operations=9
  36.                 HDFS: Number of large read operations=0
  37.                 HDFS: Number of write operations=2
  38.         Job Counters
  39.                 Launched map tasks=2
  40.                 Launched reduce tasks=1
  41.                 Data-local map tasks=2
  42.                 Total time spent by all maps in occupied slots (ms)=38057
  43.                 Total time spent by all reduces in occupied slots (ms)=24800
  44.         Map-Reduce Framework
  45.                 Map input records=13130
  46.                 Map output records=13129
  47.                 Map output bytes=118161
  48.                 Map output materialized bytes=144431
  49.                 Input split bytes=182
  50.                 Combine input records=0
  51.                 Combine output records=0
  52.                 Reduce input groups=2
  53.                 Reduce shuffle bytes=144431
  54.                 Reduce input records=13129
  55.                 Reduce output records=2
  56.                 Spilled Records=26258
  57.                 Shuffled Maps =2
  58.                 Failed Shuffles=0
  59.                 Merged Map outputs=2
  60.                 GC time elapsed (ms)=321
  61.                 CPU time spent (ms)=5110
  62.                 Physical memory (bytes) snapshot=552824832
  63.                 Virtual memory (bytes) snapshot=1228738560
  64.                 Total committed heap usage (bytes)=459800576
  65.         Shuffle Errors
  66.                 BAD_ID=0
  67.                 CONNECTION=0
  68.                 IO_ERROR=0
  69.                 WRONG_LENGTH=0
  70.                 WRONG_MAP=0
  71.                 WRONG_REDUCE=0
  72.         File Input Format Counters
  73.                 Bytes Read=1777416
  74.         File Output Format Counters
  75.                 Bytes Written=18
复制代码
到这里,程序就成功运行了?怎么查看刚刚程序运行的结果呢?运行下面命令:
  1. [wyp@wyp Hadoop_jar]$ hadoop fs -ls /user/wyp
  2. Found 2 items
  3. -rw-r--r--   1 wyp supergroup    1777168 2013-10-25 17:44 /user/wyp/data.txt
  4. drwxr-xr-x   - wyp supergroup          0 2013-10-28 15:35 /user/wyp/result
  5. [wyp@wyp Hadoop_jar]$ hadoop fs -ls /user/wyp/result
  6. Found 2 items
  7. -rw-r--r--   1 wyp supergroup    0 2013-10-28 15:35 /user/wyp/result/_SUCCESS
  8. -rw-r--r--   1 wyp supergroup  18 2013-10-28 15:35 /user/wyp/result/part-00000
  9. [wyp@wyp Hadoop_jar]$ hadoop fs -cat  /user/wyp/result/part-00000
  10. 1901        317
  11. 1902        244
复制代码
到此,你自己写好的一个Mapreduce程序终于成功运行了!
  附程序测试的数据的下载地址: data.zip (144.46 KB, 下载次数: 47)

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条