分享

hadoop CombineFileInputFormat使用

lzw 2013-12-29 22:31:18 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 9080
小文件与CombineFileInputFormat,相对于大批量的小文件,hadoop更适合处理少量的小文件。原因是FileInputFormat 生成的InputSplit是一个文件或者该文件的一部分。如果文件很小(比HDFS的块要小很多),并且文件数据很多,那么每次map任务只处理很少的输入数据,每次操作都会造成额外的开销。请比较分割成16个64MB的1GB的一个文件与100KB的10000个文件。10000个文件每个都需要使用一个map操作,作业时间比一个文件上的16个map操作慢上几十甚至几百倍。CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的,CombineFileInputFormat把多个文件打包到一个分片以便每个mapper可以处理更多的数据。CombineFileInputFormat是一个抽象类,没有提供实体类,所以使用的时候需要一些额外的工作。需要创建CombineFileInputFormat的具体子类,在旧的API实现getRecordReader()方法,新的API中实现createRecordReader()方法。下面的例子是实现新的API createRecordReader()的。
FileLineWritable 类:
  1. package com.duplicate.self;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class FileLineWritable implements WritableComparable<FileLineWritable> {
  8.        
  9.         public long offset;
  10.         public String fileName;
  11.        
  12.         public void readFields(DataInput in) throws IOException {
  13.                 this.offset = in.readLong();
  14.                 this.fileName = Text.readString(in);
  15.         }
  16.         public void write(DataOutput out) throws IOException {
  17.                 out.writeLong(this.offset);
  18.                 Text.writeString(out,this.fileName);
  19.         }
  20.         public int compareTo(FileLineWritable that) {
  21.                 int cmp = this.fileName.compareTo(that.fileName);
  22.                 if(cmp != 0){
  23.                         return cmp;
  24.                 }
  25.                 return (int)Math.signum((double)(this.offset - that.offset));
  26.         }
  27.        
  28.         @Override
  29.         public int hashCode(){
  30.                 final int prime = 31;
  31.                 int result = 1;
  32.                 result = prime * result + ((fileName == null) ? 0:this.fileName.hashCode());
  33.                 result = prime * result + (int)(this.offset ^ (this.offset >>> 2));
  34.                 return result;
  35.         }
  36.        
  37.         @Override
  38.         public boolean equals(Object obj){
  39.                 if(this == obj){
  40.                         return true;
  41.                 }
  42.                
  43.                 if(obj == null){
  44.                         return false;
  45.                 }
  46.                
  47.                 if(this.getClass() != obj.getClass()){
  48.                         return false;
  49.                 }
  50.                
  51.                 FileLineWritable other = (FileLineWritable)obj;
  52.                 if(this.fileName == null){
  53.                         if(other.fileName != null){
  54.                                 return false;
  55.                         }
  56.                 }else if(!this.fileName.equals(other.fileName)){
  57.                         return false;
  58.                 }
  59.                
  60.                 if(this.offset != other.offset){
  61.                         return false;
  62.                 }
  63.                
  64.                 return true;
  65.         }
  66. }
复制代码
MyRecordReader 类的
  1. package com.duplicate.self;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.FSDataInputStream;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.InputSplit;
  8. import org.apache.hadoop.mapreduce.RecordReader;
  9. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  10. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
  11. import org.apache.hadoop.util.LineReader;
  12. public class MyRecordReader extends RecordReader<FileLineWritable, Text> {
  13.         private long startOffset;
  14.         private long end;
  15.         private long pos;
  16.         private FileSystem fs;
  17.         private Path path;
  18.         private FileLineWritable key;
  19.         private Text value;
  20.        
  21.         private FSDataInputStream fileIn;
  22.         private LineReader reader;
  23.        
  24.         public MyRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index) throws IOException{
  25.                 this.path = split.getPath(index);
  26.                 this.fs = this.path.getFileSystem(context.getConfiguration());
  27.                 this.startOffset = split.getOffset(index);
  28.                 this.end = this.startOffset + split.getLength();
  29.                
  30.                 fileIn = this.fs.open(this.path);
  31.                 reader = new LineReader(fileIn);
  32.                 this.pos = this.startOffset;
  33.         }
  34.        
  35.         @Override
  36.         public void close() throws IOException {
  37.                 // TODO Auto-generated method stub
  38.                
  39.         }
  40.         @Override
  41.         public FileLineWritable getCurrentKey() throws IOException,
  42.                         InterruptedException {
  43.                 // TODO Auto-generated method stub
  44.                 return key;
  45.         }
  46.         @Override
  47.         public Text getCurrentValue() throws IOException, InterruptedException {
  48.                 // TODO Auto-generated method stub
  49.                 return value;
  50.         }
  51.         @Override
  52.         public float getProgress() throws IOException, InterruptedException {
  53.                 if(this.startOffset == this.end){
  54.                         return 0;
  55.                 }
  56.                
  57.                 return Math.min(1.0f, (this.pos - this.startOffset)/(float)(this.end - this.startOffset));
  58.         }
  59.         @Override
  60.         public void initialize(InputSplit arg0, TaskAttemptContext arg1)
  61.                         throws IOException, InterruptedException {
  62.                 // TODO Auto-generated method stub
  63.                
  64.         }
  65.         @Override
  66.         public boolean nextKeyValue() throws IOException, InterruptedException {
  67.                 if(key == null){
  68.                         key = new FileLineWritable();
  69.                         key.fileName = path.getName();
  70.                 }
  71.                
  72.                 key.offset = pos;
  73.                 if(null == value){
  74.                         value = new Text();
  75.                 }
  76.                
  77.                 int newSize = 0;
  78.                 if(pos < end){
  79.                         newSize = reader.readLine(value);
  80.                         pos += newSize;
  81.                 }
  82.                
  83.                 if(newSize == 0){
  84.                         key = null;
  85.                         value = null;
  86.                         return false;
  87.                 }else{
  88.                         return true;
  89.                 }
  90.         }
  91. }
复制代码
MyCombineFileInputFormat 类:
  1. package com.duplicate.self;
  2. import java.io.IOException;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.InputSplit;
  6. import org.apache.hadoop.mapreduce.JobContext;
  7. import org.apache.hadoop.mapreduce.RecordReader;
  8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  9. import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
  11. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
  12. public class MyCombineFileInputFormat extends CombineFileInputFormat<FileLineWritable, Text> {
  13.         @Override
  14.         public RecordReader<FileLineWritable, Text> createRecordReader(InputSplit split,
  15.                         TaskAttemptContext context) throws IOException {
  16.                 return new CombineFileRecordReader<FileLineWritable,Text>((CombineFileSplit)split,context,MyRecordReader.class);
  17.         }
  18.        
  19.          @Override
  20.           protected boolean isSplitable(JobContext context, Path file){
  21.             return false;
  22.           }
  23. }
复制代码
job配置
  1. import org.apache.hadoop.mapreduce.Job;
  2. // standard hadoop conf
  3. Job job = new Job(getConf());
  4. job.setInputFormatClass(MyCombineFileInputFormat .class);
  5. job.setMapperClass(Mapper.class);
  6. job.setNumReduceTasks(0); // map only
复制代码
来自群组: hadoop技术组
欢迎加入about云群9037177932227315139327136 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条