分享

hadoop中YARN的作业生命周期及作业恢复

本帖最后由 howtodown 于 2014-1-31 16:03 编辑

作业生命周期

在正式讲解作业生命周期之前,先要了解MRAppMaster中作业表示方式,每个作业由若干干Map Task和Reduce Task组成,每个Task进一步由若干个TaskAttempt组成,Job、Task和TaskAttempt的生命周期均由一个状态机表示,具体可参考https://issues.apache.org/jira/browse/MAPREDUCE-279(附件中的图yarn-state-machine.job.png,yarn-state-machine.task.png和yarn-state-machine.task-attempt.png)



作业的创建入口在MRAppMaster类中,如下所示:


  1. public class MRAppMaster extends CompositeService {
  2.   public void start() {
  3.     ...
  4.     job = createJob(getConfig());//创建Job
  5.     JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
  6.     jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask,ReduceTask
  7.     startJobs();//启动作业,这是后续一切动作的触发之源
  8.     ...
  9.   }
  10. protected Job createJob(Configuration conf) {
  11.   Job newJob =
  12.     new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
  13.       taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
  14.       completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
  15.       currentUser.getUserName(), appSubmitTime, amInfos, context);
  16.   ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
  17.   dispatcher.register(JobFinishEvent.Type.class,
  18.     createJobFinishEventHandler());
  19.   return newJob;
  20.   }
  21. }
复制代码

(1)作业/任务初始化

JobImpl会接收到.JOB_INIT事件,然后触发作业状态从NEW变为INITED,并触发函数InitTransition(),该函数会创建MapTask和

ReduceTask,代码如下:


  1. public static class InitTransition
  2.   implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
  3.   ...
  4.   createMapTasks(job, inputLength, taskSplitMetaInfo);
  5.   createReduceTasks(job);
  6.   ...
  7. }
复制代码

其中,createMapTasks函数实现如下:


  1. private void createMapTasks(JobImpl job, long inputLength,
  2.   TaskSplitMetaInfo[] splits) {
  3.   for (int i=0; i &lt; job.numMapTasks; ++i) {
  4.     TaskImpl task =
  5.       new MapTaskImpl(job.jobId, i,
  6.       job.eventHandler,
  7.       job.remoteJobConfFile,
  8.       job.conf, splits[i],
  9.       job.taskAttemptListener,
  10. job.committer, job.jobToken, job.fsTokens,
  11. job.clock, job.completedTasksFromPreviousRun,
  12. job.applicationAttemptId.getAttemptId(),
  13. job.metrics, job.appContext);
  14. job.addTask(task);
  15. }
  16. }
复制代码

(2)作业启动


  1. public class MRAppMaster extends CompositeService {
  2. protected void startJobs() {
  3. JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
  4. dispatcher.getEventHandler().handle(startJobEvent);
  5. }
  6. }
复制代码

JobImpl会接收到.JOB_START事件,会触发作业状态从INITED变为RUNNING,并触发函数StartTransition(),进而触发Map Task和Reduce Task开始调度:


  1. public static class StartTransition
  2. implements SingleArcTransition&lt;JobImpl, JobEvent&gt; {
  3. public void transition(JobImpl job, JobEvent event) {
  4. job.scheduleTasks(job.mapTasks);
  5. job.scheduleTasks(job.reduceTasks);
  6. }
  7. }
复制代码

这之后,所有Map Task和Reduce Task各自负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,则会创建一个运行实例TaskAttempt,如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例TaskAttempt,直到一个TaskAttempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的任务所经历的状态变化如下(不包含失败或者被杀死情况):

【总结】

本文分析只是起到抛砖引入的作用,读者如果感兴趣,可以自行更深入的研究以下内容:

(1)Job、Task和TaskAttempt状态机设计(分别在JobImpl、TaskImpl和TaskAttemptImpl中)

(2)在以下几种场景下,以上三个状态机的涉及到的变化:

1)  kill job

2)  kill task attempt

3)  fail task attempt

4)  container failed


5)  lose node


-------------------------------------------------------------------------------------------------------------------------------------------------------------------


作业恢复

在MRAppMaster中,记录日志是由服务JobHistoryEventHandler完成的,而作业恢复是由服务RecoveryService完成的。

同MRv1一样,MRv2也会对一些关键的事件记录日志,这主要有两个作用:(1)方便用户查看历史作业运行信息 (2)作业因故障重新启动后,可根据日志信息恢复之前已经运行完成的任务,以减少重新计算代价。

MRAppMaster采用的日志格式与MRv1一样,但有两个小的改动:

(1)实现方式不同。MRv1采用了同步记录日志的方式,也就是说,每发生一个行为,会记录一次日志,然后才可以执行下面的代码。由于YARN引入了基于事件的异步编程模型,因此,MRAppMaster也采用了异步方式记录日志。

(2)存储位置不同。尽管MRv1允许用户将作业日志存放到HDFS上,但默认是存储到本地的,MRAppMaster则不同,它直接将日志写到HDFS上,这样,当MRAppMaster失败后,另一个MRAppMaster启动时,可直接读取HDFS中上一个作业产生的日志,以恢复已经运行完成的任务。

当前MRAppMaster记录的日志事件包括以下几类:

MRAppMaster-job-event.jpg

作业恢复的过程是重新解析作业日志,以恢复各个任务运行状态的过程(重做日志),这是由RecoveryService完成的。如果用户将yarn.app.mapreduce.am.job.recovery.enable参数置为true(默认就是true),则MRAppMaster运行作业之前,首先会检查这是否是第一次运行该作业,如果不是,则从HDFS上读取上次运行的作业日志,并恢复作业的运行状态,然后才会按照正常流程执行。相关代码如下(在MRAppMaster类中):

  1. public void init(final Configuration conf) {
  2.   boolean recoveryEnabled = conf.getBoolean(
  3.     MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
  4.   boolean recoverySupportedByCommitter = committer.isRecoverySupported();
  5.   if (recoveryEnabled && recoverySupportedByCommitter
  6.     && appAttemptID.getAttemptId() > 1) {
  7.     LOG.info("Recovery is enabled. "
  8.       + "Will try to recover from previous life on best effort basis.");
  9.     recoveryServ = createRecoveryService(context);
  10.     addIfService(recoveryServ);
  11.     dispatcher = recoveryServ.getDispatcher();
  12.     clock = recoveryServ.getClock();
  13.     inRecovery = true;
  14.   }
  15. }
  16. public void start() {
  17.   if (inRecovery) {
  18.     completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
  19.     amInfos = recoveryServ.getAMInfos();
  20.   }
  21.   ……
  22. }
复制代码
来自群组: IT男人帮

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条