分享

hadoop Map如何读取数据块

gwgyk 发表于 2014-11-15 14:26:26 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 33245
由于工作需要,得仔细了解下Map Task读取数据块的流程,但是我从Child的main()方法中开始一路跟踪到MapTask.java中的runNewMapper()方法时,却不知道这个方法是从哪儿开始读取数据块的,麻烦大家帮我看看,谢谢啦
  1. @SuppressWarnings("unchecked")
  2.   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  3.   void runNewMapper(final JobConf job,
  4.                     final TaskSplitIndex splitIndex,
  5.                     final TaskUmbilicalProtocol umbilical,
  6.                     TaskReporter reporter
  7.                     ) throws IOException, ClassNotFoundException,
  8.                              InterruptedException {
  9.     // make a task context so we can get the classes
  10.     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
  11.       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
  12.     // make a mapper
  13.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
  14.       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
  15.         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
  16.     // make the input format
  17.     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
  18.       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
  19.         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  20.     // rebuild the input split
  21.     org.apache.hadoop.mapreduce.InputSplit split = null;
  22.     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
  23.         splitIndex.getStartOffset());
  24.     LOG.info("Processing split: " + split);
  25.     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
  26.       new NewTrackingRecordReader<INKEY,INVALUE>
  27.           (split, inputFormat, reporter, job, taskContext);
  28.     job.setBoolean("mapred.skip.on", isSkipping());
  29.     org.apache.hadoop.mapreduce.RecordWriter output = null;
  30.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
  31.          mapperContext = null;
  32.     try {
  33.       Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
  34.         org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
  35.         (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
  36.                      Configuration.class,
  37.                      org.apache.hadoop.mapreduce.TaskAttemptID.class,
  38.                      org.apache.hadoop.mapreduce.RecordReader.class,
  39.                      org.apache.hadoop.mapreduce.RecordWriter.class,
  40.                      org.apache.hadoop.mapreduce.OutputCommitter.class,
  41.                      org.apache.hadoop.mapreduce.StatusReporter.class,
  42.                      org.apache.hadoop.mapreduce.InputSplit.class});
  43.       // get an output object
  44.       if (job.getNumReduceTasks() == 0) {
  45.          output =
  46.            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
  47.       } else {
  48.         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
  49.       }
  50.       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
  51.                                                      input, output, committer,
  52.                                                      reporter, split);
  53.       input.initialize(split, mapperContext);
  54.       mapper.run(mapperContext);
  55.       input.close();
  56.       input = null;
  57.       output.close(mapperContext);
  58.       output = null;
  59.     } catch (NoSuchMethodException e) {
  60.       throw new IOException("Can't find Context constructor", e);
  61.     } catch (InstantiationException e) {
  62.       throw new IOException("Can't create Context", e);
  63.     } catch (InvocationTargetException e) {
  64.       throw new IOException("Can't invoke Context constructor", e);
  65.     } catch (IllegalAccessException e) {
  66.       throw new IOException("Can't invoke Context constructor", e);
  67.     } finally {
  68.       closeQuietly(input);
  69.       closeQuietly(output, mapperContext);
  70.     }
  71.   }
复制代码
在DFSInputStream.java中有个read()方法,但我却不知道哪儿调用了这个方法,麻烦大家帮忙看看,多谢啦
比较着急,困扰我好几天了


已有(8)人评论

跳转到指定楼层
bioger_hit 发表于 2014-11-15 15:32:07

hadoop作业提交之Child启动map任务流程:

1.Child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,在Child的main函数中调用执行。
2. 在Chilld中,执行Task的run方法。Task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。
3. 在MapTask的run中执行runMapper方法来调用mapper定义的方法。
4. 在runNewMapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。
5. mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据
6. mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。


下面代码8中,即为调用run方法。


@SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final BytesWritable rawSplit,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // 1. 根据传入的jobconf构造一个context,包含了job相关的所有配置信息,如后面用到的mapper、inputformat等。
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
    // 2. 根据配置的mapper类创建一个Mapper实例
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // 根据配置的input format创建一个InputFormat实例。
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // 4.重新够构建InputSplit
    org.apache.hadoop.mapreduce.InputSplit split = null;
    DataInputBuffer splitBuffer = new DataInputBuffer();
    splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
    SerializationFactory factory = new SerializationFactory(job);
    Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>
      deserializer =
        (Deserializer< extends org.apache.hadoop.mapreduce.InputSplit>)
        factory.getDeserializer(job.getClassByName(splitClass));
    deserializer.open(splitBuffer);
    split = deserializer.deserialize(null);

    //5. 创建RecordReader,其实使用的是适配器模式适配了inputFormat的Reader。
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
          (inputFormat.createRecordReader(split, taskContext), reporter);

    job.setBoolean("mapred.skip.on", isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
         mapperContext = null;
    try {
      Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
        org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
        (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
                     Configuration.class,
                     org.apache.hadoop.mapreduce.TaskAttemptID.class,
                     org.apache.hadoop.mapreduce.RecordReader.class,
                     org.apache.hadoop.mapreduce.RecordWriter.class,
                     org.apache.hadoop.mapreduce.OutputCommitter.class,
                     org.apache.hadoop.mapreduce.StatusReporter.class,
                     org.apache.hadoop.mapreduce.InputSplit.class});

      //6. 构造输出RecordWriter。当没有Reducer时,output是配置的outputFormat的RecordWriter,即直接写输出。如果ruducer数量不为0,则构造一个NewOutputCollector
      if (job.getNumReduceTasks() == 0) {
        output = outputFormat.getRecordWriter(taskContext);
      } else {
        output = new NewOutputCollector(job, umbilical, reporter);
      }

      //7.构造Mapper.Context,封装了刚才配置的所有信息,在map执行时候时候使用。
      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                     input, output, committer,
                                                     reporter, split);

      input.initialize(split, mapperContext);
      //8. 调用mapper的run方法来执行map动作。
      mapper.run(mapperContext);
      input.close();
      output.close(mapperContext);
    } catch (NoSuchMethodException e) {
      throw new IOException("Can't find Context constructor", e);
    } catch (InstantiationException e) {
      throw new IOException("Can't create Context", e);
    } catch (InvocationTargetException e) {
      throw new IOException("Can't invoke Context constructor", e);
    } catch (IllegalAccessException e) {
      throw new IOException("Can't invoke Context constructor", e);
    }
  }



回复

使用道具 举报

gwgyk 发表于 2014-11-15 16:01:28
bioger_hit 发表于 2014-11-15 15:32
hadoop作业提交之Child启动map任务流程:

1.Child类根据前面输入的三个参数,即tasktracher的地址、端 ...

谢谢你,这个方法解释的很详细。不过我主要还是想了解Map是如何联系DataNode来读取数据块。
  1. input.initialize(split, mapperContext);
复制代码
这句话中构建了FSDataInputStream对象,但是我却没看到在哪儿读取数据块,所以我想知道,在runNewMapper()方法中哪儿来读取数据块呢?
回复

使用道具 举报

desehawk 发表于 2014-11-15 17:11:17

hadoop作业提交之Child启动map任务


你可能找错地方了,因为走到map的数据都是已经分片的数据,也就是key和value,这已经是被读取的数据了。key是偏移量,而value则是从datanode读取的数据。所以在这里不可能是再次读取datanode的,因为已经读取完毕了。

读取块参考
Hadoop源码注释 - 块读取源码分析



回复

使用道具 举报

gwgyk 发表于 2014-11-15 17:15:35
desehawk 发表于 2014-11-15 17:11
你可能找错地方了,因为走到map的数据都是已经分片的数据,也就是key和value,这已经是被读取的数据了。ke ...

原来是这样,我一直是这么想的:Map Task自己去读取数据块。我这么理解是不是不对?
那么在TaskTracker执行Map Task的时候,读取数据块的部分是在哪儿执行的呢?谢谢啦
回复

使用道具 举报

desehawk 发表于 2014-11-15 17:47:26
gwgyk 发表于 2014-11-15 17:15
原来是这样,我一直是这么想的:Map Task自己去读取数据块。我这么理解是不是不对?
那么在TaskTracker ...
楼主,可以看看这个图:




应该是RecordReader读取的数据块。详细参考
Hadoop对Map执行框架流程分析(TaskTracker端)







回复

使用道具 举报

howtodown 发表于 2014-11-15 18:18:22
desehawk 发表于 2014-11-15 17:47
楼主,可以看看这个图:
RecordReader其实应该还是对 key ,value的处理。

真正读取块的内容,应该是LineRecordReader
从HDFS读取文件的源码分析

回复

使用道具 举报

gwgyk 发表于 2014-11-15 19:50:12
desehawk 发表于 2014-11-15 17:47
楼主,可以看看这个图:

文章还是没有具体说到读取数据块的内容啊

回复

使用道具 举报

howtodown 发表于 2014-11-15 20:29:29
gwgyk 发表于 2014-11-15 19:50
文章还是没有具体说到读取数据块的内容啊
你对下面的看法是什么?




5) RemoteBlockReader,对打开socket获取DataNode输入流DataInputStream进行封装,开始读取文件数据,文件数据是DataNode按照每个chunk封装一次传输给客户端,客户端给每个chunk检查数据完整性。
提供给上层读取socket中数据流接口:


开始从一个个chunk中读取数据:



每个chunk的读取:


6) Block传输读协议
先是客户端打开连接后,向DataNode输入block的信息:


然后DataNode按照一个个chunk向DFSClient发送数据流供读,chunk的数据格式,按照数据读取先后顺序罗列:

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条