分享

Hadoop学习总结:Map-Reduce的过程解析

本帖最后由 xioaxu790 于 2014-10-10 07:50 编辑
问题导读
1、Map-Reduce提交一个任务是从哪里开始的?
2、JobTracker运行的main函数有哪几部分?
3、如何理解Map-Reduce实现的过程机制?






一、客户端
Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
  1. public static RunningJob runJob(JobConf job) throws IOException {
  2.   //首先生成一个JobClient对象
  3.   JobClient jc = new JobClient(job);
  4.   ……
  5.   //调用submitJob来提交一个任务
  6.   running = jc.submitJob(job);
  7.   JobID jobId = running.getID();
  8.   ……
  9.   while (true) {
  10.      //while循环中不断得到此任务的状态,并打印到客户端console中
  11.   }
  12.   return running;
  13. }
  14. 其中JobClient的submitJob函数实现如下:
  15. public RunningJob submitJob(JobConf job) throws FileNotFoundException,
  16.                                 InvalidJobConfException, IOException {
  17.   //从JobTracker得到当前任务的id
  18.   JobID jobId = jobSubmitClient.getNewJobId();
  19.   //准备将任务运行所需要的要素写入HDFS:
  20.   //任务运行程序所在的jar封装成job.jar
  21.   //任务所要处理的input split信息写入job.split
  22.   //任务运行的配置项汇总写入job.xml
  23.   Path submitJobDir = new Path(getSystemDir(), jobId.toString());
  24.   Path submitJarFile = new Path(submitJobDir, "job.jar");
  25.   Path submitSplitFile = new Path(submitJobDir, "job.split");
  26.   //此处将-libjars命令行指定的jar上传至HDFS
  27.   configureCommandLineOptions(job, submitJobDir, submitJarFile);
  28.   Path submitJobFile = new Path(submitJobDir, "job.xml");
  29.   ……
  30.   //通过input format的格式获得相应的input split,默认类型为FileSplit
  31.   InputSplit[] splits =
  32.     job.getInputFormat().getSplits(job, job.getNumMapTasks());
  33.   // 生成一个写入流,将input split得信息写入job.split文件
  34.   FSDataOutputStream out = FileSystem.create(fs,
  35.       submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));
  36.   try {
  37.     //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。
  38.     //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。
  39.     writeSplitsFile(splits, out);
  40.   } finally {
  41.     out.close();
  42.   }
  43.   job.set("mapred.job.split.file", submitSplitFile.toString());
  44.   //根据split的个数设定map task的个数
  45.   job.setNumMapTasks(splits.length);
  46.   // 写入job的配置信息入job.xml文件      
  47.   out = FileSystem.create(fs, submitJobFile,
  48.       new FsPermission(JOB_FILE_PERMISSION));
  49.   try {
  50.     job.writeXml(out);
  51.   } finally {
  52.     out.close();
  53.   }
  54.   //真正的调用JobTracker来提交任务
  55.   JobStatus status = jobSubmitClient.submitJob(jobId);
  56.   ……
  57. }
复制代码


二、JobTracker
JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

调用静态函数startTracker(new JobConf())创建一个JobTracker对象
调用JobTracker.offerService()函数提供服务
在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
  1. synchronized (jobs) {
  2.   synchronized (taskScheduler) {
  3.     jobs.put(job.getProfile().getJobID(), job);
  4.     //对JobTracker的每一个listener都调用jobAdded函数
  5.     for (JobInProgressListener listener : jobInProgressListeners) {
  6.       listener.jobAdded(job);
  7.     }
  8.   }
  9. }
复制代码



EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

  1. public synchronized void initTasks() throws IOException {
  2.   ……
  3.   //从HDFS中读取job.split文件从而生成input splits
  4.   String jobFile = profile.getJobFile();
  5.   Path sysDir = new Path(this.jobtracker.getSystemDir());
  6.   FileSystem fs = sysDir.getFileSystem(conf);
  7.   DataInputStream splitFile =
  8.     fs.open(new Path(conf.get("mapred.job.split.file")));
  9.   JobClient.RawSplit[] splits;
  10.   try {
  11.     splits = JobClient.readSplitFile(splitFile);
  12.   } finally {
  13.     splitFile.close();
  14.   }
  15.   //map task的个数就是input split的个数
  16.   numMapTasks = splits.length;
  17.   //为每个map tasks生成一个TaskInProgress来处理一个input split
  18.   maps = new TaskInProgress[numMapTasks];
  19.   for(int i=0; i < numMapTasks; ++i) {
  20.     inputLength += splits[i].getDataLength();
  21.     maps[i] = new TaskInProgress(jobId, jobFile,
  22.                                  splits[i],
  23.                                  jobtracker, conf, this, i);
  24.   }
  25.   //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。
  26.   if (numMapTasks > 0) {
  27.     nonRunningMapCache = createCache(splits, maxLevel);
  28.   }
  29.   //创建reduce task
  30.   this.reduces = new TaskInProgress[numReduceTasks];
  31.   for (int i = 0; i < numReduceTasks; i++) {
  32.     reduces[i] = new TaskInProgress(jobId, jobFile,
  33.                                     numMapTasks, i,
  34.                                     jobtracker, conf, this);
  35.     //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。
  36.     nonRunningReduces.add(reduces[i]);
  37.   }
  38.   //创建两个cleanup task,一个用来清理map,一个用来清理reduce.
  39.   cleanup = new TaskInProgress[2];
  40.   cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
  41.           jobtracker, conf, this, numMapTasks);
  42.   cleanup[0].setJobCleanupTask();
  43.   cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  44.                      numReduceTasks, jobtracker, conf, this);
  45.   cleanup[1].setJobCleanupTask();
  46.   //创建两个初始化 task,一个初始化map,一个初始化reduce.
  47.   setup = new TaskInProgress[2];
  48.   setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
  49.           jobtracker, conf, this, numMapTasks + 1 );
  50.   setup[0].setJobSetupTask();
  51.   setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  52.                      numReduceTasks + 1, jobtracker, conf, this);
  53.   setup[1].setJobSetupTask();
  54.   tasksInited.set(true);//初始化完毕
  55.   ……
  56. }
复制代码



三、TaskTracker
TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

  1. State offerService() throws Exception {
  2.   long lastHeartbeat = 0;
  3.   //TaskTracker进行是一直存在的
  4.   while (running && !shuttingDown) {
  5.       ……
  6.       long now = System.currentTimeMillis();
  7.       //每隔一段时间就向JobTracker发送heartbeat
  8.       long waitTime = heartbeatInterval - (now - lastHeartbeat);
  9.       if (waitTime > 0) {
  10.         synchronized(finishedCount) {
  11.           if (finishedCount[0] == 0) {
  12.             finishedCount.wait(waitTime);
  13.           }
  14.           finishedCount[0] = 0;
  15.         }
  16.       }
  17.       ……
  18.       //发送Heartbeat到JobTracker,得到response
  19.       HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
  20.       ……
  21.      //从Response中得到此TaskTracker需要做的事情
  22.       TaskTrackerAction[] actions = heartbeatResponse.getActions();
  23.       ……
  24.       if (actions != null){
  25.         for(TaskTrackerAction action: actions) {
  26.           if (action instanceof LaunchTaskAction) {
  27.             //如果是运行一个新的Task,则将Action添加到任务队列中
  28.             addToTaskQueue((LaunchTaskAction)action);
  29.           } else if (action instanceof CommitTaskAction) {
  30.             CommitTaskAction commitAction = (CommitTaskAction)action;
  31.             if (!commitResponses.contains(commitAction.getTaskID())) {
  32.               commitResponses.add(commitAction.getTaskID());
  33.             }
  34.           } else {
  35.             tasksToCleanup.put(action);
  36.           }
  37.         }
  38.       }
  39.   }
  40.   return State.NORMAL;
  41. }
  42. 其中transmitHeartBeat主要逻辑如下:
  43. private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
  44.   //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
  45.   boolean sendCounters;
  46.   if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
  47.     sendCounters = true;
  48.     previousUpdate = now;
  49.   }
  50.   else {
  51.     sendCounters = false;
  52.   }
  53.   ……
  54.   //报告给JobTracker,此TaskTracker的当前状态
  55.   if (status == null) {
  56.     synchronized (this) {
  57.       status = new TaskTrackerStatus(taskTrackerName, localHostname,
  58.                                      httpPort,
  59.                                      cloneAndResetRunningTaskStatuses(
  60.                                        sendCounters),
  61.                                      failures,
  62.                                      maxCurrentMapTasks,
  63.                                      maxCurrentReduceTasks);
  64.     }
  65.   }
  66.   ……
  67.   //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:
  68.   //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数
  69.   //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数
  70.   boolean askForNewTask;
  71.   long localMinSpaceStart;
  72.   synchronized (this) {
  73.     askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
  74.                      status.countReduceTasks() < maxCurrentReduceTasks) &&
  75.                     acceptNewTasks;
  76.     localMinSpaceStart = minSpaceStart;
  77.   }
  78.   ……
  79.   //向JobTracker发送heartbeat,这是一个RPC调用
  80.   HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
  81.                                                             justStarted, askForNewTask,
  82.                                                             heartbeatResponseId);
  83.   ……
  84.   return heartbeatResponse;
  85. }
复制代码



四、JobTracker
当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

  1. public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
  2.                                                 boolean initialContact, boolean acceptNewTasks, short responseId)
  3.   throws IOException {
  4.   ……
  5.   String trackerName = status.getTrackerName();
  6.   ……
  7.   short newResponseId = (short)(responseId + 1);
  8.   ……
  9.   HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
  10.   List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
  11.   //如果TaskTracker向JobTracker请求一个task运行
  12.   if (acceptNewTasks) {
  13.     TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
  14.     if (taskTrackerStatus == null) {
  15.       LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
  16.     } else {
  17.       //setup和cleanup的task优先级最高
  18.       List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
  19.       if (tasks == null ) {
  20.         //任务调度器分配任务
  21.         tasks = taskScheduler.assignTasks(taskTrackerStatus);
  22.       }
  23.       if (tasks != null) {
  24.         for (Task task : tasks) {
  25.           //将任务放入actions列表,返回给TaskTracker
  26.           expireLaunchingTasks.addNewTask(task.getTaskID());
  27.           actions.add(new LaunchTaskAction(task));
  28.         }
  29.       }
  30.     }
  31.   }
  32.   ……
  33.   int nextInterval = getNextHeartbeatInterval();
  34.   response.setHeartbeatInterval(nextInterval);
  35.   response.setActions(
  36.                       actions.toArray(new TaskTrackerAction[actions.size()]));
  37.   ……
  38.   return response;
  39. }
复制代码


默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
  1. public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  2.     throws IOException {
  3.   ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
  4.   int numTaskTrackers = clusterStatus.getTaskTrackers();
  5.   Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();
  6.   int maxCurrentMapTasks = taskTracker.getMaxMapTasks();
  7.   int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();
  8.   int numMaps = taskTracker.countMapTasks();
  9.   int numReduces = taskTracker.countReduceTasks();
  10.   //计算剩余的map和reduce的工作量:remaining
  11.   int remainingReduceLoad = 0;
  12.   int remainingMapLoad = 0;
  13.   synchronized (jobQueue) {
  14.     for (JobInProgress job : jobQueue) {
  15.       if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  16.         int totalMapTasks = job.desiredMaps();
  17.         int totalReduceTasks = job.desiredReduces();
  18.         remainingMapLoad += (totalMapTasks - job.finishedMaps());
  19.         remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
  20.       }
  21.     }
  22.   }
  23.   //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。
  24.   int maxMapLoad = 0;
  25.   int maxReduceLoad = 0;
  26.   if (numTaskTrackers > 0) {
  27.     maxMapLoad = Math.min(maxCurrentMapTasks,
  28.                           (int) Math.ceil((double) remainingMapLoad /
  29.                                           numTaskTrackers));
  30.     maxReduceLoad = Math.min(maxCurrentReduceTasks,
  31.                              (int) Math.ceil((double) remainingReduceLoad
  32.                                              / numTaskTrackers));
  33.   }
  34.   ……
  35.   //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task
  36.   if (numMaps < maxMapLoad) {
  37.     int totalNeededMaps = 0;
  38.     synchronized (jobQueue) {
  39.       for (JobInProgress job : jobQueue) {
  40.         if (job.getStatus().getRunState() != JobStatus.RUNNING) {
  41.           continue;
  42.         }
  43.         Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,
  44.             taskTrackerManager.getNumberOfUniqueHosts());
  45.         if (t != null) {
  46.           return Collections.singletonList(t);
  47.         }
  48.         ……
  49.       }
  50.     }
  51.   }
  52.   //分配完map task,再分配reduce task
  53.   if (numReduces < maxReduceLoad) {
  54.     int totalNeededReduces = 0;
  55.     synchronized (jobQueue) {
  56.       for (JobInProgress job : jobQueue) {
  57.         if (job.getStatus().getRunState() != JobStatus.RUNNING ||
  58.             job.numReduceTasks == 0) {
  59.           continue;
  60.         }
  61.         Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
  62.             taskTrackerManager.getNumberOfUniqueHosts());
  63.         if (t != null) {
  64.           return Collections.singletonList(t);
  65.         }
  66.         ……
  67.       }
  68.     }
  69.   }
  70.   return null;
  71. }
复制代码


从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。


五、TaskTracker
在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
  1. private void addToTaskQueue(LaunchTaskAction action) {
  2.   if (action.getTask().isMapTask()) {
  3.     mapLauncher.addToTaskQueue(action);
  4.   } else {
  5.     reduceLauncher.addToTaskQueue(action);
  6.   }
  7. }
复制代码

TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):

  1. private void localizeJob(TaskInProgress tip) throws IOException {
  2.   //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar
  3.   Path localJarFile = null;
  4.   Task t = tip.getTask();
  5.   JobID jobId = t.getJobID();
  6.   Path jobFile = new Path(t.getJobFile());
  7.   ……
  8.   Path localJobFile = lDirAlloc.getLocalPathForWrite(
  9.                                   getLocalJobDir(jobId.toString())
  10.                                   + Path.SEPARATOR + "job.xml",
  11.                                   jobFileSize, fConf);
  12.   RunningJob rjob = addTaskToJob(jobId, tip);
  13.   synchronized (rjob) {
  14.     if (!rjob.localized) {
  15.       FileSystem localFs = FileSystem.getLocal(fConf);
  16.       Path jobDir = localJobFile.getParent();
  17.       ……
  18.       //将job.split拷贝到本地
  19.       systemFS.copyToLocalFile(jobFile, localJobFile);
  20.       JobConf localJobConf = new JobConf(localJobFile);
  21.       Path workDir = lDirAlloc.getLocalPathForWrite(
  22.                        (getLocalJobDir(jobId.toString())
  23.                        + Path.SEPARATOR + "work"), fConf);
  24.       if (!localFs.mkdirs(workDir)) {
  25.         throw new IOException("Mkdirs failed to create "
  26.                     + workDir.toString());
  27.       }
  28.       System.setProperty("job.local.dir", workDir.toString());
  29.       localJobConf.set("job.local.dir", workDir.toString());
  30.       // copy Jar file to the local FS and unjar it.
  31.       String jarFile = localJobConf.getJar();
  32.       long jarFileSize = -1;
  33.       if (jarFile != null) {
  34.         Path jarFilePath = new Path(jarFile);
  35.         localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
  36.                                    getLocalJobDir(jobId.toString())
  37.                                    + Path.SEPARATOR + "jars",
  38.                                    5 * jarFileSize, fConf), "job.jar");
  39.         if (!localFs.mkdirs(localJarFile.getParent())) {
  40.           throw new IOException("Mkdirs failed to create jars directory ");
  41.         }
  42.         //将job.jar拷贝到本地
  43.         systemFS.copyToLocalFile(jarFilePath, localJarFile);
  44.         localJobConf.setJar(localJarFile.toString());
  45.        //将job得configuration写成job.xml
  46.         OutputStream out = localFs.create(localJobFile);
  47.         try {
  48.           localJobConf.writeXml(out);
  49.         } finally {
  50.           out.close();
  51.         }
  52.         // 解压缩job.jar
  53.         RunJar.unJar(new File(localJarFile.toString()),
  54.                      new File(localJarFile.getParent().toString()));
  55.       }
  56.       rjob.localized = true;
  57.       rjob.jobConf = localJobConf;
  58.     }
  59.   }
  60.   //真正的启动此Task
  61.   launchTaskForJob(tip, new JobConf(rjob.jobConf));
  62. }
复制代码


当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:

  1. public synchronized void launchTask() throws IOException {
  2.     ……
  3.     //创建task运行目录
  4.     localizeTask(task);
  5.     if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  6.       this.taskStatus.setRunState(TaskStatus.State.RUNNING);
  7.     }
  8.     //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner
  9.     this.runner = task.createRunner(TaskTracker.this, this);
  10.     this.runner.start();
  11.     this.taskStatus.setStartTime(System.currentTimeMillis());
  12. }
复制代码


TaskRunner是一个线程,其run函数如下:

  1. public final void run() {
  2.     ……
  3.     TaskAttemptID taskid = t.getTaskID();
  4.     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  5.     File jobCacheDir = null;
  6.     if (conf.getJar() != null) {
  7.       jobCacheDir = new File(
  8.                         new Path(conf.getJar()).getParent().toString());
  9.     }
  10.     File workDir = new File(lDirAlloc.getLocalPathToRead(
  11.                               TaskTracker.getLocalTaskDir(
  12.                                 t.getJobID().toString(),
  13.                                 t.getTaskID().toString(),
  14.                                 t.isTaskCleanupTask())
  15.                               + Path.SEPARATOR + MRConstants.WORKDIR,
  16.                               conf). toString());
  17.     FileSystem fileSystem;
  18.     Path localPath;
  19.     ……
  20.     //拼写classpath
  21.     String baseDir;
  22.     String sep = System.getProperty("path.separator");
  23.     StringBuffer classPath = new StringBuffer();
  24.     // start with same classpath as parent process
  25.     classPath.append(System.getProperty("java.class.path"));
  26.     classPath.append(sep);
  27.     if (!workDir.mkdirs()) {
  28.       if (!workDir.isDirectory()) {
  29.         LOG.fatal("Mkdirs failed to create " + workDir.toString());
  30.       }
  31.     }
  32.     String jar = conf.getJar();
  33.     if (jar != null) {      
  34.       // if jar exists, it into workDir
  35.       File[] libs = new File(jobCacheDir, "lib").listFiles();
  36.       if (libs != null) {
  37.         for (int i = 0; i < libs.length; i++) {
  38.           classPath.append(sep);            // add libs from jar to classpath
  39.           classPath.append(libs[i]);
  40.         }
  41.       }
  42.       classPath.append(sep);
  43.       classPath.append(new File(jobCacheDir, "classes"));
  44.       classPath.append(sep);
  45.       classPath.append(jobCacheDir);
  46.     }
  47.     ……
  48.     classPath.append(sep);
  49.     classPath.append(workDir);
  50.     //拼写命令行java及其参数
  51.     Vector<String> vargs = new Vector<String>(8);
  52.     File jvm =
  53.       new File(new File(System.getProperty("java.home"), "bin"), "java");
  54.     vargs.add(jvm.toString());
  55.     String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
  56.     javaOpts = javaOpts.replace("@taskid@", taskid.toString());
  57.     String [] javaOptsSplit = javaOpts.split(" ");
  58.     String libraryPath = System.getProperty("java.library.path");
  59.     if (libraryPath == null) {
  60.       libraryPath = workDir.getAbsolutePath();
  61.     } else {
  62.       libraryPath += sep + workDir;
  63.     }
  64.     boolean hasUserLDPath = false;
  65.     for(int i=0; i<javaOptsSplit.length ;i++) {
  66.       if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
  67.         javaOptsSplit[i] += sep + libraryPath;
  68.         hasUserLDPath = true;
  69.         break;
  70.       }
  71.     }
  72.     if(!hasUserLDPath) {
  73.       vargs.add("-Djava.library.path=" + libraryPath);
  74.     }
  75.     for (int i = 0; i < javaOptsSplit.length; i++) {
  76.       vargs.add(javaOptsSplit[i]);
  77.     }
  78.     //添加Child进程的临时文件夹
  79.     String tmp = conf.get("mapred.child.tmp", "./tmp");
  80.     Path tmpDir = new Path(tmp);
  81.     if (!tmpDir.isAbsolute()) {
  82.       tmpDir = new Path(workDir.toString(), tmp);
  83.     }
  84.     FileSystem localFs = FileSystem.getLocal(conf);
  85.     if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
  86.       throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  87.     }
  88.     vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
  89.     // Add classpath.
  90.     vargs.add("-classpath");
  91.     vargs.add(classPath.toString());
  92.     //log文件夹
  93.     long logSize = TaskLog.getTaskLogLength(conf);
  94.     vargs.add("-Dhadoop.log.dir=" +
  95.         new File(System.getProperty("hadoop.log.dir")
  96.         ).getAbsolutePath());
  97.     vargs.add("-Dhadoop.root.logger=INFO,TLA");
  98.     vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
  99.     vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
  100.     // 运行map task和reduce task的子进程的main class是Child
  101.     vargs.add(Child.class.getName());  // main of Child
  102.     ……
  103.     //运行子进程
  104.     jvmManager.launchJvm(this,
  105.         jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
  106.             workDir, env, pidFile, conf));
  107. }
复制代码



六、Child
真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
  1. while (true) {
  2.   //从TaskTracker通过网络通信得到JvmTask对象
  3.   JvmTask myTask = umbilical.getTask(jvmId);
  4.   ……
  5.   idleLoopCount = 0;
  6.   task = myTask.getTask();
  7.   taskid = task.getTaskID();
  8.   isCleanup = task.isTaskCleanupTask();
  9.   JobConf job = new JobConf(task.getJobFile());
  10.   TaskRunner.setupWorkDir(job);
  11.   numTasksToExecute = job.getNumTasksToExecutePerJvm();
  12.   task.setConf(job);
  13.   defaultConf.addResource(new Path(task.getJobFile()));
  14.   ……
  15.   //运行task
  16.   task.run(job, umbilical);             // run the task
  17.   if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
  18.     break;
  19.   }
  20. }
复制代码


6.1、MapTask
如果task是MapTask,则其run函数如下:

  1. public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  2.   throws IOException {
  3.   //用于同TaskTracker进行通信,汇报运行状况
  4.   final Reporter reporter = getReporter(umbilical);
  5.   startCommunicationThread(umbilical);
  6.   initialize(job, reporter);
  7.   ……
  8.   //map task的输出
  9.   int numReduceTasks = conf.getNumReduceTasks();
  10.   MapOutputCollector collector = null;
  11.   if (numReduceTasks > 0) {
  12.     collector = new MapOutputBuffer(umbilical, job, reporter);
  13.   } else {
  14.     collector = new DirectMapOutputCollector(umbilical, job, reporter);
  15.   }
  16.   //读取input split,按照其中的信息,生成RecordReader来读取数据
  17. instantiatedSplit = (InputSplit)
  18.       ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
  19.   DataInputBuffer splitBuffer = new DataInputBuffer();
  20.   splitBuffer.reset(split.getBytes(), 0, split.getLength());
  21.   instantiatedSplit.readFields(splitBuffer);
  22.   if (instantiatedSplit instanceof FileSplit) {
  23.     FileSplit fileSplit = (FileSplit) instantiatedSplit;
  24.     job.set("map.input.file", fileSplit.getPath().toString());
  25.     job.setLong("map.input.start", fileSplit.getStart());
  26.     job.setLong("map.input.length", fileSplit.getLength());
  27.   }
  28.   RecordReader rawIn =                  // open input
  29.     job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);
  30.   RecordReader in = isSkipping() ?
  31.       new SkippingRecordReader(rawIn, getCounters(), umbilical) :
  32.       new TrackedRecordReader(rawIn, getCounters());
  33.   job.setBoolean("mapred.skip.on", isSkipping());
  34.   //对于map task,生成一个MapRunnable,默认是MapRunner
  35.   MapRunnable runner =
  36.     ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
  37.   try {
  38.     //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。
  39.     runner.run(in, collector, reporter);     
  40.     collector.flush();
  41.   } finally {
  42.     in.close();                               // close input
  43.     collector.close();
  44.   }
  45.   done(umbilical);
  46. }
复制代码

MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:

  1. public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  2.                 Reporter reporter)
  3.   throws IOException {
  4.   try {
  5.     K1 key = input.createKey();
  6.     V1 value = input.createValue();
  7.     while (input.next(key, value)) {
  8.       mapper.map(key, value, output, reporter);
  9.       if(incrProcCount) {
  10.         reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
  11.             SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  12.       }
  13.     }
  14.   } finally {
  15.     mapper.close();
  16.   }
  17. }
复制代码



结果集全部收集到MapOutputBuffer中,其collect函数如下:

  1. public synchronized void collect(K key, V value)
  2.     throws IOException {
  3.   reporter.progress();
  4.   ……
  5.   //从此处看,此buffer是一个ring的数据结构
  6.   final int kvnext = (kvindex + 1) % kvoffsets.length;
  7.   spillLock.lock();
  8.   try {
  9.     boolean kvfull;
  10.     do {
  11.       //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了
  12.       kvfull = kvnext == kvstart;
  13.       //在ring中计算是否需要将buffer写入硬盘的阈值
  14.       final boolean kvsoftlimit = ((kvnext > kvend)
  15.           ? kvnext - kvend > softRecordLimit
  16.           : kvend - kvnext <= kvoffsets.length - softRecordLimit);
  17.       //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。
  18.       //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘
  19.       if (kvstart == kvend && kvsoftlimit) {
  20.         startSpill();
  21.       }
  22.       //如果buffer满了,则只能等待写入完毕
  23.       if (kvfull) {
  24.           while (kvstart != kvend) {
  25.             reporter.progress();
  26.             spillDone.await();
  27.           }
  28.       }
  29.     } while (kvfull);
  30.   } finally {
  31.     spillLock.unlock();
  32.   }
  33.   try {
  34.     //如果buffer不满,则将key, value写入buffer
  35.     int keystart = bufindex;
  36.     keySerializer.serialize(key);
  37.     final int valstart = bufindex;
  38.     valSerializer.serialize(value);
  39.     int valend = bb.markRecord();
  40.     //调用设定的partitioner,根据key, value取得partition id
  41.     final int partition = partitioner.getPartition(key, value, partitions);
  42.     mapOutputRecordCounter.increment(1);
  43.     mapOutputByteCounter.increment(valend >= keystart
  44.         ? valend - keystart
  45.         : (bufvoid - keystart) + valend);
  46.     //将parition id以及key, value在buffer中的偏移量写入索引数组
  47.     int ind = kvindex * ACCTSIZE;
  48.     kvoffsets[kvindex] = ind;
  49.     kvindices[ind + PARTITION] = partition;
  50.     kvindices[ind + KEYSTART] = keystart;
  51.     kvindices[ind + VALSTART] = valstart;
  52.     kvindex = kvnext;
  53.   } catch (MapBufferTooSmallException e) {
  54.     LOG.info("Record too large for in-memory buffer: " + e.getMessage());
  55.     spillSingleRecord(key, value);
  56.     mapOutputRecordCounter.increment(1);
  57.     return;
  58.   }
  59. }
复制代码



内存buffer的格式如下:

(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)

1.jpg

kvoffsets是为了写入内存前排序使用的。

从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:

  1. private void sortAndSpill() throws IOException {
  2.   ……
  3.   FSDataOutputStream out = null;
  4.   FSDataOutputStream indexOut = null;
  5.   IFileOutputStream indexChecksumOut = null;
  6.   //创建硬盘上的spill文件
  7.   Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  8.                                   numSpills, size);
  9.   out = rfs.create(filename);
  10.   ……
  11.   final int endPosition = (kvend > kvstart)
  12.     ? kvend
  13.     : kvoffsets.length + kvend;
  14.   //按照partition的顺序对buffer中的数据进行排序
  15.   sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  16.   int spindex = kvstart;
  17.   InMemValBytes value = new InMemValBytes();
  18.   //依次一个一个parition的写入文件
  19.   for (int i = 0; i < partitions; ++i) {
  20.     IFile.Writer<K, V> writer = null;
  21.     long segmentStart = out.getPos();
  22.     writer = new Writer<K, V>(job, out, keyClass, valClass, codec);
  23.     //如果combiner为空,则直接写入文件
  24.     if (null == combinerClass) {
  25.         ……
  26.         writer.append(key, value);
  27.         ++spindex;
  28.      }
  29.      else {
  30.         ……
  31.         //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件
  32.         combineAndSpill(kvIter, combineInputCounter);
  33.      }
  34.   }
  35.   ……
  36. }
复制代码


当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
  1. private void mergeParts() throws IOException {
  2.     ……
  3.     //对于每一个partition
  4.     for (int parts = 0; parts < partitions; parts++){
  5.       //create the segments to be merged
  6.       List<Segment<K, V>> segmentList =
  7.         new ArrayList<Segment<K, V>>(numSpills);
  8.       TaskAttemptID mapId = getTaskID();
  9.        //依次从各个spill文件中收集属于当前partition的段
  10.       for(int i = 0; i < numSpills; i++) {
  11.         final IndexRecord indexRecord =
  12.           getIndexInformation(mapId, i, parts);
  13.         long segmentOffset = indexRecord.startOffset;
  14.         long segmentLength = indexRecord.partLength;
  15.         Segment<K, V> s =
  16.           new Segment<K, V>(job, rfs, filename[i], segmentOffset,
  17.                             segmentLength, codec, true);
  18.         segmentList.add(i, s);
  19.       }
  20.       //将属于同一个partition的段merge到一起
  21.       RawKeyValueIterator kvIter =
  22.         Merger.merge(job, rfs,
  23.                      keyClass, valClass,
  24.                      segmentList, job.getInt("io.sort.factor", 100),
  25.                      new Path(getTaskID().toString()),
  26.                      job.getOutputKeyComparator(), reporter);
  27.       //写入合并后的段到文件
  28.       long segmentStart = finalOut.getPos();
  29.       Writer<K, V> writer =
  30.           new Writer<K, V>(job, finalOut, keyClass, valClass, codec);
  31.       if (null == combinerClass || numSpills < minSpillsForCombine) {
  32.         Merger.writeFile(kvIter, writer, reporter, job);
  33.       } else {
  34.         combineCollector.setWriter(writer);
  35.         combineAndSpill(kvIter, combineInputCounter);
  36.       }
  37.       ……
  38.     }
  39. }
复制代码

6.2、ReduceTask
ReduceTask的run函数如下:
  1. public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  2.   throws IOException {
  3.   job.setBoolean("mapred.skip.on", isSkipping());
  4.   //对于reduce,则包含三个步骤:拷贝,排序,Reduce
  5.   if (isMapOrReduce()) {
  6.     copyPhase = getProgress().addPhase("copy");
  7.     sortPhase  = getProgress().addPhase("sort");
  8.     reducePhase = getProgress().addPhase("reduce");
  9.   }
  10.   startCommunicationThread(umbilical);
  11.   final Reporter reporter = getReporter(umbilical);
  12.   initialize(job, reporter);
  13.   //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。
  14.   boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
  15.   if (!isLocal) {
  16.     reduceCopier = new ReduceCopier(umbilical, job);
  17.     if (!reduceCopier.fetchOutputs()) {
  18.         ……
  19.     }
  20.   }
  21.   copyPhase.complete();
  22.   //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value
  23.   setPhase(TaskStatus.Phase.SORT);
  24.   statusUpdate(umbilical);
  25.   final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  26.   RawKeyValueIterator rIter = isLocal
  27.     ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
  28.         job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
  29.         !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
  30.         new Path(getTaskID().toString()), job.getOutputKeyComparator(),
  31.         reporter)
  32.     : reduceCopier.createKVIterator(job, rfs, reporter);
  33.   mapOutputFilesOnDisk.clear();
  34.   sortPhase.complete();
  35.   //reduce阶段
  36.   setPhase(TaskStatus.Phase.REDUCE);
  37.   ……
  38.   Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
  39.   Class keyClass = job.getMapOutputKeyClass();
  40.   Class valClass = job.getMapOutputValueClass();
  41.   ReduceValuesIterator values = isSkipping() ?
  42.      new SkippingReduceValuesIterator(rIter,
  43.           job.getOutputValueGroupingComparator(), keyClass, valClass,
  44.           job, reporter, umbilical) :
  45.       new ReduceValuesIterator(rIter,
  46.       job.getOutputValueGroupingComparator(), keyClass, valClass,
  47.       job, reporter);
  48.   //逐个读出key-value list,然后调用Reducer的reduce函数
  49.   while (values.more()) {
  50.     reduceInputKeyCounter.increment(1);
  51.     reducer.reduce(values.getKey(), values, collector, reporter);
  52.     values.nextKey();
  53.     values.informReduceProgress();
  54.   }
  55.   reducer.close();
  56.   out.close(reporter);
  57.   done(umbilical);
  58. }
复制代码




七、总结
Map-Reduce的过程总结如下图:
1.jpg




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条