分享

Spark源码分析(四)-Job提交过程

本帖最后由 52Pig 于 2014-10-5 21:38 编辑

问题导读:
1.Job的提交过程主要经过哪些对象的处理?

2.Dependency类的作用是什么?它包含哪些对象?
3.分析下DAGScheduler处理job的过程

4.gif

本文将以一个简单的WordCount为例来看看Job的提交过程
job1.png

由输出的日志可以看出job的提交过程主要经过了SparkContext-》DAGScheduler-》TaskScheduler的处理
job2.png

先从RDD入手,看看RDD的转化过程。在wordcount程序中一个README.md文件从HadoopRDD最终会转化为MapPartitionsRDD
job3.png

textFile()函数只是从hdfs、local file等读取文件转换成HadoopRDD,并通过map函数转化为了MappedRDD文件
job4.png

与RDD相关的一个重要类就是Dependency类,它负责表示RDD之间的依赖关系。包含了NarrowDependency(窄依赖)与ShuffleDependency(宽依赖)两类。其中NarrowDependency包含一对一的OneToOneDependency与一对多的RangeDependency。在wordcount程序中MappedRDD、FlatMappedRDD都属于OneToOneDependency,而ShuffledRDD、MapPartitionsRDD属于ShuffleDependency。
job5.png
job真正的执行入口是从count这个action开始的

job6.png

job提交的大致调用链是:sc.runJob()->dagScheduler.runJob->dagScheduler.submitJob->dagSchedulerEventProcessActor.JobSubmitted->dagScheduler.handleJobSubmitted->dagScheduler.submitStage->dagScheduler.submitMissingTasks->taskScheduler.submitTasks可以看出job先经过DAGScheduler生成stage,转换成TaskSet后提交给TaskScheduler进行调度。TaskScheduler工作原理在上一节已经分析过了,下面主要来分析下DAGScheduler处理job的过程:
job处理过程中handleJobSubmitted比较关键,finalRDD就是进行action操作前的最后一个RDD,对应wordcount就是MapPartitionsRDD。
  1. private[scheduler] def handleJobSubmitted(jobId: Int,
  2.       finalRDD: RDD[_],
  3.       func: (TaskContext, Iterator[_]) => _,
  4.       partitions: Array[Int],
  5.       allowLocal: Boolean,
  6.       callSite: String,
  7.       listener: JobListener,
  8.       properties: Properties = null)
  9.   {
  10.     var finalStage: Stage = null
  11.     try {
  12.       // New stage creation may throw an exception if, for example, jobs are run on a
  13.       // HadoopRDD whose underlying HDFS files have been deleted.
  14.       finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
  15.     } catch {
  16.       case e: Exception =>
  17.         logWarning("Creating new stage failed due to exception - job: " + jobId, e)
  18.         listener.jobFailed(e)
  19.         return
  20.     }
  21.     if (finalStage != null) {
  22.       val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
  23.       clearCacheLocs()
  24.       logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
  25.         job.jobId, callSite, partitions.length, allowLocal))
  26.       logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
  27.       logInfo("Parents of final stage: " + finalStage.parents)
  28.       logInfo("Missing parents: " + getMissingParentStages(finalStage))
  29.       if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
  30.         // Compute very short actions like first() or take() with no parent stages locally.
  31.         listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
  32.         runLocally(job)
  33.       } else {
  34.         jobIdToActiveJob(jobId) = job
  35.         activeJobs += job
  36.         resultStageToJob(finalStage) = job
  37.         listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
  38.           properties))
  39.         submitStage(finalStage)
  40.       }
  41.     }
  42.     submitWaitingStages()
  43.   }
复制代码
getMissingParentStages函数中会根据finalstage对应finalRDD的dependence类型来创建它的父stage。由于MapPartitionsRDD属于ShuffleDependency,所以上面的日志截图中可以看出finalStage(stage 0)的父stage是(stage 1)
job7.png
submitStage函数中会根据依赖关系划分stage,通过递归调用从finalStage一直往前找它的父stage,直到stage没有父stage时就调用submitMissingTasks方法提交改stage。这样就完成了将job划分为一个或者多个stage。
job8.png
最后会在submitMissingTasks函数中将stage封装成TaskSet通过taskScheduler.submitTasks函数提交给TaskScheduler处理。

相关源码:





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条