分享

mapreduce 多种输入

hyj 发表于 2014-6-30 09:47:26 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 5 20669
问题导读:
1.如何多个路径,mapreduce如何实现?
2.多种输入,mapreduce如何实现?






1.多路径输入

1)FileInputFormat.addInputPath 多次调用加载不同路径

FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"));
FileInputFormat.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path2"));

2)FileInputFormat.addInputPaths一次调用加载 多路径字符串用逗号隔开

FileInputFormat.addInputPaths(job, "hdfs://RS5-112:9000/cs/path1,hdfs://RS5-112:9000/cs/path2");

2.多种输入

MultipleInputs可以加载不同路径的输入文件,并且每个路径可用不同的maper
MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);

MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);

例子:
  1. package example;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. /**
  15. * 多类型文件输入
  16. * @author lijl
  17. *
  18. */
  19. public class MultiTypeFileInputMR {
  20. static class MultiTypeFileInput1Mapper extends Mapper<LongWritable, Text, Text, Text>{
  21. public void map(LongWritable key,Text value,Context context){
  22. try {
  23. String[] str = value.toString().split("\\|");
  24. context.write(new Text(str[0]), new Text(str[1]));
  25. } catch (IOException e) {
  26. e.printStackTrace();
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. }
  32. static class MultiTypeFileInput3Mapper extends Mapper<LongWritable, Text, Text, Text>{
  33. public void map(LongWritable key,Text value,Context context){
  34. try {
  35. String[] str = value.toString().split("");
  36. context.write(new Text(str[0]), new Text(str[1]));
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. } catch (InterruptedException e) {
  40. e.printStackTrace();
  41. }
  42. }
  43. }
  44. static class MultiTypeFileInputReducer extends Reducer<Text, Text, Text, Text>{
  45. public void reduce(Text key,Iterable<Text> values,Context context){
  46. try {
  47. for(Text value:values){
  48. context.write(key,value);
  49. }
  50. } catch (IOException e) {
  51. e.printStackTrace();
  52. } catch (InterruptedException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  58. Configuration conf = new Configuration();
  59. conf.set("mapred.textoutputformat.separator", ",");
  60. Job job = new Job(conf,"MultiPathFileInput");
  61. job.setJarByClass(MultiTypeFileInputMR.class);
  62. FileOutputFormat.setOutputPath(job, new Path("hdfs://RS5-112:9000/cs/path6"));
  63. job.setMapOutputKeyClass(Text.class);
  64. job.setMapOutputValueClass(Text.class);
  65. job.setOutputKeyClass(Text.class);
  66. job.setOutputValueClass(Text.class);
  67. job.setReducerClass(MultiTypeFileInputReducer.class);
  68. job.setNumReduceTasks(1);
  69. MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path1"), TextInputFormat.class,MultiTypeFileInput1Mapper.class);
  70. MultipleInputs.addInputPath(job, new Path("hdfs://RS5-112:9000/cs/path3"), TextInputFormat.class,MultiTypeFileInput3Mapper.class);
  71. System.exit(job.waitForCompletion(true)?0:1);
  72. }
  73. }
复制代码









已有(5)人评论

跳转到指定楼层
雨雪中的fish 发表于 2014-8-26 10:51:57
请问下楼主啊啊 FileInputFormat.addInputPath(job, new Path(args[0])); 后面的Path(args[0])是什么啊?
回复

使用道具 举报

hyj 发表于 2014-8-27 11:44:38
雨雪中的fish 发表于 2014-8-26 10:51
请问下楼主啊啊 FileInputFormat.addInputPath(job, new Path(args[0])); 后面的Path(args[0])是什么啊?

java程序有一个主方法,是这样的public static void main(String [] args)
你说的args[0]就是你用命令行编译运行java程序时,传入的第一个参数,比如你运行一个程序,代码如下:


  1. public class Test{
  2.     public static void main(String [] args){
  3.         for(int i=0;i<args.length;i++)
  4.             System.out.println(args[i]);
  5.     }
  6. }
复制代码



编译
javac Test.java
运行
java Test param1 param2 回车
你得到的结果是
param1
param2
也就是说args[0]是你传入的第一个参数args[2]是传入的第二个参数,以此类推。


回复

使用道具 举报

程序猿的无奈 发表于 2016-6-3 16:16:41
楼主,你知道hadoop2.5怎么实现多个mapper输入吗
回复

使用道具 举报

Kevin517 发表于 2016-11-12 14:35:03
请问楼主,我如果想对 .doc/ .pdf 的文件进行统计,该如何从 HDFS 上获取内容?

直接读的话,好像不行。。。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条