分享

如何修改(mapreduce)reduce执行后文件的文件名、目录等

howtodown 发表于 2014-10-27 10:52:01 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 2 29144
问题导读
1.如何修改reduce执行后文件名称?
2.实现自定义文件的输出,由哪个类来实现?






这样一个需求:

需要把原始的日志文件用hadoop做清洗后,按业务线输出到不同的目录下去,以供不同的部门业务线使用。

这个需求需要用到MultipleOutputFormat和MultipleOutputs来实现自定义多目录、文件的输出。

需要注意的是,在hadoop 0.21.x之前和之后的使用方式是不一样的:

hadoop 0.21 之前的API 中有 org.apache.hadoop.mapred.lib.MultipleOutputFormat 和 org.apache.hadoop.mapred.lib.MultipleOutputs,而到了 0.21 之后 的API为 org.apache.hadoop.mapreduce.lib.output.MultipleOutputs ,

新版的API 整合了上面旧API两个的功能,没有了MultipleOutputFormat。

本文将给出新旧两个版本的API code

1、旧版0.21.x之前的版本:


  1. import java.io.IOException;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.LongWritable;
  6. import org.apache.hadoop.io.NullWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapred.FileInputFormat;
  9. import org.apache.hadoop.mapred.FileOutputFormat;
  10. import org.apache.hadoop.mapred.JobClient;
  11. import org.apache.hadoop.mapred.JobConf;
  12. import org.apache.hadoop.mapred.MapReduceBase;
  13. import org.apache.hadoop.mapred.Mapper;
  14. import org.apache.hadoop.mapred.OutputCollector;
  15. import org.apache.hadoop.mapred.Reporter;
  16. import org.apache.hadoop.mapred.TextInputFormat;
  17. import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
  18. import org.apache.hadoop.util.Tool;
  19. import org.apache.hadoop.util.ToolRunner;
  20. public class MultiFile extends Configured implements Tool {
  21.     public static class MapClass extends MapReduceBase implements
  22.             Mapper<LongWritable, Text, NullWritable, Text> {
  23.         @Override
  24.         public void map(LongWritable key, Text value,
  25.                 OutputCollector<NullWritable, Text> output, Reporter reporter)
  26.                 throws IOException {
  27.             output.collect(NullWritable.get(), value);
  28.         }
  29.     }
  30.     // MultipleTextOutputFormat 继承自MultipleOutputFormat,实现输出文件的分类
  31.     public static class PartitionByCountryMTOF extends
  32.             MultipleTextOutputFormat<NullWritable, Text> { // key is
  33.                                                             // NullWritable,
  34.                                                             // value is Text
  35.         protected String generateFileNameForKeyValue(NullWritable key,
  36.                 Text value, String filename) {
  37.             String[] arr = value.toString().split(",", -1);
  38.             String country = arr[4].substring(1, 3); // 获取country的名称
  39.             return country + "/" + filename;
  40.         }
  41.     }
  42.     // 此处不使用reducer
  43.     /*
  44.      * public static class Reducer extends MapReduceBase implements
  45.      * org.apache.hadoop.mapred.Reducer<LongWritable, Text, NullWritable, Text>
  46.      * {
  47.      *
  48.      * @Override public void reduce(LongWritable key, Iterator<Text> values,
  49.      * OutputCollector<NullWritable, Text> output, Reporter reporter) throws
  50.      * IOException { // TODO Auto-generated method stub
  51.      *
  52.      * }
  53.      *
  54.      * }
  55.      */
  56.     @Override
  57.     public int run(String[] args) throws Exception {
  58.         Configuration conf = getConf();
  59.         JobConf job = new JobConf(conf, MultiFile.class);
  60.         Path in = new Path(args[0]);
  61.         Path out = new Path(args[1]);
  62.         FileInputFormat.setInputPaths(job, in);
  63.         FileOutputFormat.setOutputPath(job, out);
  64.         job.setJobName("MultiFile");
  65.         job.setMapperClass(MapClass.class);
  66.         job.setInputFormat(TextInputFormat.class);
  67.         job.setOutputFormat(PartitionByCountryMTOF.class);
  68.         job.setOutputKeyClass(NullWritable.class);
  69.         job.setOutputValueClass(Text.class);
  70.         job.setNumReduceTasks(0);
  71.         JobClient.runJob(job);
  72.         return 0;
  73.     }
  74.     public static void main(String[] args) throws Exception {
  75.         int res = ToolRunner.run(new Configuration(), new MultiFile(), args);
  76.         System.exit(res);
  77.     }
  78. }
复制代码

测试数据及结果:
  1. hadoop fs -cat /tmp/multiTest.txt
  2. 5765303,1998,14046,1996,"AD","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
  3. 5785566,1998,14088,1996,"AD","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
  4. 5894770,1999,14354,1997,"AD","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
  5. 5765303,1998,14046,1996,"CN","",,1,12,42,5,59,11,1,0.4545,0,0,1,67.3636,,,,
  6. 5785566,1998,14088,1996,"CN","",,1,9,441,6,69,3,0,1,,0.6667,,4.3333,,,,
  7. 5894770,1999,14354,1997,"CN","",,1,,82,5,51,4,0,1,,0.625,,7.5,,,,
复制代码

232100_WFPA_568818.jpg




2、新版0.21.x及之后的版本:
  1. public class TestwithMultipleOutputs extends Configured implements Tool {
  2.   public static class MapClass extends Mapper<LongWritable,Text,Text,IntWritable> {
  3.     private MultipleOutputs<Text,IntWritable> mos;
  4.     protected void setup(Context context) throws IOException,InterruptedException {
  5.       mos = new MultipleOutputs<Text,IntWritable>(context);
  6.     }
  7.     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
  8.       String line = value.toString();
  9.       String[] tokens = line.split("-");
  10.       mos.write("MOSInt",new Text(tokens[0]), new IntWritable(Integer.parseInt(tokens[1])));  //(第一处)
  11.       mos.write("MOSText", new Text(tokens[0]),tokens[2]);     //(第二处)
  12.       mos.write("MOSText", new Text(tokens[0]),line,tokens[0]+"/");  //(第三处)同时也可写到指定的文件或文件夹中
  13.     }
  14.     protected void cleanup(Context context) throws IOException,InterruptedException {
  15.       mos.close();
  16.     }
  17.   }
  18.   public int run(String[] args) throws Exception {
  19.     Configuration conf = getConf();
  20.     Job job = new Job(conf,"word count with MultipleOutputs");
  21.     job.setJarByClass(TestwithMultipleOutputs.class);
  22.     Path in = new Path(args[0]);
  23.     Path out = new Path(args[1]);
  24.     FileInputFormat.setInputPaths(job, in);
  25.     FileOutputFormat.setOutputPath(job, out);
  26.     job.setMapperClass(MapClass.class);
  27.     job.setNumReduceTasks(0);  
  28.     MultipleOutputs.addNamedOutput(job,"MOSInt",TextOutputFormat.class,Text.class,IntWritable.class);
  29.     MultipleOutputs.addNamedOutput(job,"MOSText",TextOutputFormat.class,Text.class,Text.class);
  30.     System.exit(job.waitForCompletion(true)?0:1);
  31.     return 0;
  32.   }
  33.   public static void main(String[] args) throws Exception {
  34.     int res = ToolRunner.run(new Configuration(), new TestwithMultipleOutputs(), args);
  35.     System.exit(res);
  36.   }
  37. }
复制代码



测试的数据:
abc-1232-hdf
abc-123-rtd
ioj-234-grjth
ntg-653-sdgfvd
kju-876-btyun
bhm-530-bhyt
hfter-45642-bhgf
bgrfg-8956-fmgh
jnhdf-8734-adfbgf
ntg-68763-nfhsdf
ntg-98634-dehuy
hfter-84567-drhuk
结果截图:(结果输出到/test/testMOSout)
08233424_qafD.png
PS:遇到的一个问题:
  如果没有mos.close(), 程序运行中会出现异常:
  12/05/21 20:12:47 WARN hdfs.DFSClient: DataStreamer Exception:
  org.apache.hadoop.ipc.RemoteException:org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on
  /test/mosreduce/_temporary/_attempt_local_0001_r_000000_0/h-r-00000 File does not exist. [Lease. Holder: DFSClient_-352105532, pendingcreates: 5]












已有(2)人评论

跳转到指定楼层
536528395 发表于 2015-2-4 16:01:44
这个你试过多个输入文件么??
我试了下怎么多个文件不会合并计算阿?
回复

使用道具 举报

ainubis 发表于 2015-3-28 21:53:50
谢谢楼主分享。     
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条