分享

Hadoop MapReduce中如何处理跨行Block和InputSplit

howtodown 发表于 2014-3-4 00:04:20 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 8507
本帖最后由 howtodown 于 2014-3-4 00:06 编辑
可以带着下面问题来阅读:
1. Hadoop的一个Block默认是64M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2. 在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,InputSplit的Mapper会不会得出不正确的结果?

对于上面的两个问题,首先要明确两个概念:Block和InputSplit:

1. Block是HDFS存储文件的单位(默认是64M);

2. InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)。

因此以行记录形式的文本,可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

下面以hadoop-0.22.0源码进行分析

org.apache.hadoop.mapred.FileInputFormat:
  1. public InputSplit[] getSplits(JobConf job, int numSplits)
  2.     throws IOException {
  3.     FileStatus[] files = listStatus(job);
  4.    
  5.     // Save the number of input files for metrics/loadgen
  6.     job.setLong(NUM_INPUT_FILES, files.length);
  7.     long totalSize = 0;                           // compute total size
  8.     for (FileStatus file: files) {                // check we have valid files
  9.       if (file.isDirectory()) {
  10.         throw new IOException("Not a file: "+ file.getPath());
  11.       }
  12.       totalSize += file.getLen();
  13.     }
  14.     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  15.     long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  16.       FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  17.     // generate splits
  18.     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  19.     NetworkTopology clusterMap = new NetworkTopology();
  20.     for (FileStatus file: files) {
  21.       Path path = file.getPath();
  22.       FileSystem fs = path.getFileSystem(job);
  23.       long length = file.getLen();
  24.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  25.       if ((length != 0) && isSplitable(fs, path)) {
  26.         long blockSize = file.getBlockSize();
  27.         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  28.         long bytesRemaining = length;
  29.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  30.           String[] splitHosts = getSplitHosts(blkLocations,
  31.               length-bytesRemaining, splitSize, clusterMap);
  32.           splits.add(makeSplit(path, length-bytesRemaining, splitSize,
  33.                                splitHosts));
  34.           bytesRemaining -= splitSize;
  35.         }
  36.         
  37.         if (bytesRemaining != 0) {
  38.           splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
  39.                      blkLocations[blkLocations.length-1].getHosts()));
  40.         }
  41.       } else if (length != 0) {
  42.         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
  43.         splits.add(makeSplit(path, 0, length, splitHosts));
  44.       } else {
  45.         //Create empty hosts array for zero length files
  46.         splits.add(makeSplit(path, 0, length, new String[0]));
  47.       }
  48.     }
  49.     LOG.debug("Total # of splits: " + splits.size());
  50.     return splits.toArray(new FileSplit[splits.size()]);
  51.   }
复制代码
从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分,splitSize = computeSplitSize(goalSize, minSize, blockSize);goalSize,minSize,blockSize都可以配置,默认splitSize 就等于blockSize的默认值(64m)。

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,可能被切分到不同的InputSplit。 但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成 。
org.apache.hadoop.mapred.TextInputFormat:
  1. public RecordReader<LongWritable, Text> getRecordReader(
  2.                                           InputSplit genericSplit, JobConf job,
  3.                                           Reporter reporter)
  4.     throws IOException {
  5.    
  6.     reporter.setStatus(genericSplit.toString());
  7.     return new LineRecordReader(job, (FileSplit) genericSplit);
  8.   }
复制代码
org.apache.hadoop.mapred.LineRecordReader :
  1. /** Read a line. */
  2.   public synchronized boolean next(LongWritable key, Text value)
  3.     throws IOException {
  4.     // We always read one extra line, which lies outside the upper
  5.     // split limit i.e. (end - 1)
  6.     while (getFilePosition() <= end) {
  7.       key.set(pos);
  8.       int newSize = in.readLine(value, maxLineLength,
  9.           Math.max(maxBytesToConsume(pos), maxLineLength));
  10.       if (newSize == 0) {
  11.         return false;
  12.       }
  13.       pos += newSize;
  14.       if (newSize < maxLineLength) {
  15.         return true;
  16.       }
  17.       // line too long. try again
  18.       LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
  19.     }
  20.     return false;
  21.   }
复制代码
对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取 。


如果一行记录L跨越了A,B两个InputSplit,读A的时候已经读取了跨越A,B的这条记录L,那么对B这个InputSplit读取的时候,如何做到不读取L这条记录在B中的部分呢?

org.apache.hadoop.mapred.LineRecordReader:
  1. // If this is not the first split, we always throw away first record
  2.     // because we always (except the last split) read one extra line in
  3.     // next() method.
  4.     if (start != 0) {
  5.       start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  6.     }
复制代码
如果不是first split,则会丢弃第一个record,避免了重复读取的问题。



没找到任何评论,期待你打破沉寂

howtodown 发表于 2014-3-4 00:05:50
上面这个问题,我们在实际的操作中可能碰不到,但是这个被忽略的问题,很容易在面试的时候被问到。
从上面摘录了一部分认为比较好的:

FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,可能被切分到不同的InputSplit。 但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成 。




回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条