分享

hadoop作业提交之TaskTracker 启动task

问题导读
1、如何初始化job的目录?
2、在TaskRunner的run方法中构建一个java命令的执行条件,包含哪些内容?
3、JvmRunner线程都有哪些方法?





一、概要描述

上篇博文描 述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到 LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法 后,TaskTracker怎么来处理这些Task。

实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个 TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List<TaskInProgress> tasksToLaunch。
TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动 Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程, 在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

二、 流程描述

1.tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
2.TaskLauncher内部维护了一个List<TaskInProgress> tasksToLaunch,只是把task加入到该集合中
3.taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
5.TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
6.在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
7.JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用。
8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用。
9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。

1.jpg

TaskTracker 启动task


三、代码详细

1. TaskTracker的 addToTaskQueue方法。

接上文的最后一个方法的在heartbeat中把根据jobtracker的指令把需要launch的task调用addToTaskQueue方法加入task queue。
  1. //根据task的类型不同加入到不同的launcher中。
  2. private void addToTaskQueue(LaunchTaskAction action) {
  3. if (action.getTask().isMapTask()) {
  4. mapLauncher.addToTaskQueue(action);
  5. }else {
  6. reduceLauncher.addToTaskQueue(action);
  7. }
  8. }
复制代码


2. TaskLauncher 的addToTaskQueue方法,即把要launch的task加入到TaskLauncher内维护的一个列表List<TaskInProgress> tasksToLaunch;中。
  1. public void addToTaskQueue(LaunchTaskAction action) {
  2.       synchronized (tasksToLaunch) {
  3.         TaskInProgress tip = registerTask(action, this);
  4.         tasksToLaunch.add(tip);
  5.         tasksToLaunch.notifyAll();
  6.       }
  7. }
复制代码



3. TaskLauncher线程的run方法。TaskLauncher是一个线程。一直检查task列表中有数据,取出一个来执行。
  1. public void run() {   
  2.           TaskInProgress tip;
  3.           synchronized (tasksToLaunch) {
  4.             while (tasksToLaunch.isEmpty()) {
  5.               tasksToLaunch.wait();
  6.             }
  7.             //get the TIP
  8.             tip = tasksToLaunch.remove(0);
  9.            //wait for a slot to run
  10.           synchronized (numFreeSlots) {
  11.             while (numFreeSlots.get() == 0) {
  12.               numFreeSlots.wait();
  13.             }
  14.             LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
  15.                 " and trying to launch "+tip.getTask().getTaskID());
  16.             numFreeSlots.set(numFreeSlots.get() - 1);
  17.             assert (numFreeSlots.get() >= 0);
  18.           }
  19.           //got a free slot. launch the task
  20.           startNewTask(tip);
  21.               return; // ALL DONE
  22.              }
  23.     }
  24.   }
复制代码



4. TaskTracker的startNewTask 启动一个新task。该方法的主要代码就一句。
  1. localizeJob(tip);
复制代码



5. TaskTracker的localizeJob方法。 初始化job的目录
  1. private void localizeJob(TaskInProgress tip) throws IOException {
  2.         Path localJarFile = null;
  3.         Task t = tip.getTask();
  4.         JobID jobId = t.getJobID();
  5.         Path jobFile = new Path(t.getJobFile());
  6.         FileStatus status = null;
  7.         long jobFileSize = -1;
  8.         status = systemFS.getFileStatus(jobFile);
  9.         jobFileSize = status.getLen();
  10.         Path localJobFile = lDirAlloc.getLocalPathForWrite(
  11.                 getLocalJobDir(jobId.toString())
  12.                 + Path.SEPARATOR + "job.xml",
  13.                 jobFileSize, fConf);
  14.         RunningJob rjob = addTaskToJob(jobId, tip);
  15.         synchronized (rjob) {
  16.             if (!rjob.localized) {
  17.                 FileSystem localFs = FileSystem.getLocal(fConf);
  18.                 systemFS.copyToLocalFile(jobFile, localJobFile);
  19.                 JobConf localJobConf = new JobConf(localJobFile);
  20.                 Path workDir = lDirAlloc.getLocalPathForWrite(
  21.                         (getLocalJobDir(jobId.toString())
  22.                                 + Path.SEPARATOR + "work"), fConf);
  23.                 System.setProperty("job.local.dir", workDir.toString());
  24.                 localJobConf.set("job.local.dir", workDir.toString());
  25.                 //把job的jar文件拷贝到本地文件系统并且解压。
  26.                 String jarFile = localJobConf.getJar();
  27.                 Path jarFilePath = new Path(jarFile);
  28.                 status = systemFS.getFileStatus(jarFilePath);
  29.                 jarFileSize = status.getLen();
  30.                 //保证释放的目录容量有5倍的jar文件大小
  31.                 localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
  32.                         getLocalJobDir(jobId.toString())
  33.                         + Path.SEPARATOR + "jars",
  34.                         5 * jarFileSize, fConf), "job.jar");
  35.                 //把jar文件拷贝到本地
  36.                 systemFS.copyToLocalFile(jarFilePath, localJarFile);
  37.                 localJobConf.setJar(localJarFile.toString());
  38.                 OutputStream out = localFs.create(localJobFile);
  39.                 localJobConf.writeXml(out);
  40.                 // also unjar the job.jar files
  41.                 RunJar.unJar(new File(localJarFile.toString()),
  42.                         new File(localJarFile.getParent().toString()));
  43.             }
  44.             rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
  45.                     localJobConf.getKeepFailedTaskFiles());
  46.             rjob.localized = true;
  47.             rjob.jobConf = localJobConf;
  48.         }
  49.     }
  50.     launchTaskForJob(tip, new JobConf(rjob.jobConf));
  51. }
复制代码



6. TaskTracker的addTaskToJob方法。只是把job和task的关系加入到runningJobs中。
  1. private RunningJob addTaskToJob(JobID jobId,
  2.                                   TaskInProgress tip) {
  3.     synchronized (runningJobs) {
  4.       RunningJob rJob = null;
  5.       if (!runningJobs.containsKey(jobId)) {
  6.         rJob = new RunningJob(jobId);
  7.         rJob.localized = false;
  8.         rJob.tasks = new HashSet<TaskInProgress>();
  9.         runningJobs.put(jobId, rJob);
  10.       } else {
  11.         rJob = runningJobs.get(jobId);
  12.       }
  13.       synchronized (rJob) {
  14.         rJob.tasks.add(tip);
  15.       }
  16.       runningJobs.notify(); //notify the fetcher thread
  17.       return rJob;
  18.     }
  19.   }
复制代码



7. TaskTracker的launchTaskForJob方法。调用TaskInprogress的launchTask方法。
  1. private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) {
  2.     synchronized (tip) {
  3.       tip.setJobConf(jobConf);
  4.       tip.launchTask();
  5.     }
  6.   }
复制代码



8. TaskIProgress的 launchTask方法。
  1.     public synchronized void launchTask() throws IOException {
  2.         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
  3.                 this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
  4.                 this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
  5.             localizeTask(task);
  6.             if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
  7.                 this.taskStatus.setRunState(TaskStatus.State.RUNNING);
  8.             }
  9.             //创建一个Runner来运行。
  10.             this.runner = task.createRunner(TaskTracker.this, this);
  11.             this.runner.start();
  12.             this.taskStatus.setStartTime(System.currentTimeMillis());
  13.         }
复制代码



9.TaskinProgress的localizeTask方法。把Task相关的文件拷贝到本地。
  1. private void localizeTask(Task task) throws IOException{
  2.       Path localTaskDir =
  3.         lDirAlloc.getLocalPathForWrite(
  4.           TaskTracker.getLocalTaskDir(task.getJobID().toString(),
  5.             task.getTaskID().toString(), task.isTaskCleanupTask()),
  6.           defaultJobConf );
  7.       FileSystem localFs = FileSystem.getLocal(fConf);
  8.       // create symlink for ../work if it already doesnt exist
  9.       String workDir = lDirAlloc.getLocalPathToRead(
  10.                          TaskTracker.getLocalJobDir(task.getJobID().toString())
  11.                          + Path.SEPARATOR  
  12.                          + "work", defaultJobConf).toString();
  13.       String link = localTaskDir.getParent().toString()
  14.                       + Path.SEPARATOR + "work";
  15.       File flink = new File(link);
  16.       if (!flink.exists())
  17.         FileUtil.symLink(workDir, link);
  18.       // 创建task的工作目录
  19.       Path cwd = lDirAlloc.getLocalPathForWrite(
  20.                    getLocalTaskDir(task.getJobID().toString(),
  21.                       task.getTaskID().toString(), task.isTaskCleanupTask())
  22.                    + Path.SEPARATOR + MRConstants.WORKDIR,
  23.                    defaultJobConf);
  24.       Path localTaskFile = new Path(localTaskDir, "job.xml");
  25.       task.setJobFile(localTaskFile.toString());
  26.       localJobConf.set("mapred.local.dir",
  27.                        fConf.get("mapred.local.dir"));
  28.       localJobConf.set("mapred.task.id", task.getTaskID().toString());     
  29.       }
  30.             OutputStream out = localFs.create(localTaskFile);
  31.              localJobConf.writeXml(out);
  32.       task.setConf(localJobConf);
  33.     }
复制代码



10.Task是个抽象类,两个子类分别是MapTask和ReduceTask。先关注Map的TaskRunner。
  1. public TaskRunner createRunner(TaskTracker tracker,
  2.       TaskTracker.TaskInProgress tip) {
  3.     return new MapTaskRunner(tip, tracker, this.conf);
  4.   }
复制代码



11. TaskRunner线程的Run方法,有420行代码!主要作用是根据配置信息,构造java命令,启动一个java进程。

拼接一个java指令,启动一个单独的java进程来执行每一个map或者reduce任务。这个java命令的class是Child。即这个java进程最终调用的是Child类的main函数。

TaskRunner的run方法
12.JvmManager的 launchJvm方法。在TaskRunner的run方法,是构造一个java命令的参数,调用JvmManager的launchJvm方法执行。
  1. public void launchJvm(TaskRunner t, JvmEnv env) {
  2.     if (t.getTask().isMapTask()) {
  3.       mapJvmManager.reapJvm(t, env);
  4.     } else {
  5.       reduceJvmManager.reapJvm(t, env);
  6.     }
  7.   }
复制代码



13. JvmManagerForType的reapJvm方法
  1. private synchronized void reapJvm(
  2.         TaskRunner t, JvmEnv env) {
  3.       if (t.getTaskInProgress().wasKilled()) {
  4.        //如果task被杀死则直接返回
  5.         return;
  6.       }
  7.       boolean spawnNewJvm = false;
  8.       JobID jobId = t.getTask().getJobID();
  9.       //检查是否有空闲的槽,如果小于最大jvm数,则重新开启一个jvm,不让你从现有job的空闲jvm中选择一个,或者杀死另外job的空闲jvm
  10.       int numJvmsSpawned = jvmIdToRunner.size();
  11.       JvmRunner runnerToKill = null;
  12.       if (numJvmsSpawned >= maxJvms) {
  13.         //go through the list of JVMs for all jobs.
  14.         Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
  15.           jvmIdToRunner.entrySet().iterator();
  16.         while (jvmIter.hasNext()) {
  17.           JvmRunner jvmRunner = jvmIter.next().getValue();
  18.           JobID jId = jvmRunner.jvmId.getJobId();
  19.           //look for a free JVM for this job; if one exists then just break
  20.           if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
  21.             setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
  22.             LOG.info("No new JVM spawned for jobId/taskid: " +
  23.                      jobId+"/"+t.getTask().getTaskID() +
  24.                      ". Attempting to reuse: " + jvmRunner.jvmId);
  25.             return;
  26.           }
  27.           if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
  28.               (!jId.equals(jobId) && !jvmRunner.isBusy())) {
  29.             runnerToKill = jvmRunner;
  30.             spawnNewJvm = true;
  31.           }
  32.         }
  33.       } else {
  34.         spawnNewJvm = true;
  35.       }
  36.       if (spawnNewJvm) {
  37.         if (runnerToKill != null) {
  38.           LOG.info("Killing JVM: " + runnerToKill.jvmId);
  39.           runnerToKill.kill();
  40.         }
  41.         spawnNewJvm(jobId, env, t);
  42.         return;
  43.       }
  44. }
复制代码



14. JvmManagerForType的spawnNewJvm方法。重新启动一个jvm。
  1. private void spawnNewJvm(JobID jobId, JvmEnv env,  
  2.         TaskRunner t) {
  3.       JvmRunner jvmRunner = new JvmRunner(env,jobId);
  4.       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
  5.       jvmRunner.setDaemon(true);
  6.       jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
  7.       setRunningTaskForJvm(jvmRunner.jvmId, t);
  8.       LOG.info(jvmRunner.getName());
  9.       jvmRunner.start();
  10.     }
复制代码



15. JvmRunner线程的run方法。
  1.   public void run() {
  2.         runChild(env);
  3.       }
复制代码


16. JvmRunner线程的runChild方法。其中掉ShellCommandExecutor的execute方法。 ShellCommandExecutor封装了shell执行。即把前面步骤构造的JvmEnv类型的执行信息分装成一个字符串列表,使用该列表构造一 个ShellCommandExecutor来执行命令。
  1. public void runChild(JvmEnv env) {
  2. env.vargs.add(Integer.toString(jvmId.getId()));
  3.           List<String> wrappedCommand =
  4.             TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
  5.                 env.logSize, env.pidFile);
  6.           shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
  7.               env.workDir, env.env);
  8.           shexec.execute();
  9. }
复制代码


完.





本文转载自:idouba

已有(2)人评论

跳转到指定楼层
yyk1017 发表于 2014-11-29 11:14:37
学习了,分析的好,以前一直不明白的,看了之后明白多了
回复

使用道具 举报

liusiping 发表于 2014-11-29 13:41:57
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条