分享

spark源码【 TaskScheduler】与任务提交原理浅析1

问题导读
1.本文TaskScheduler创建包含哪些过程?
2.TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的关系是什么?






引言
此篇将具体介绍一下任务创建和分发的过程,为了让逻辑更加清楚,我将分成几篇文章进行介绍,好保证简明清晰,逻辑连贯,前后统一。

TaskScheduler介绍
TaskScheduler的主要任务是提交taskset到集群运算并汇报结果。
具体而言:
  • 出现shuffle输出lost要报告fetch failed错误
  • 碰到straggle任务需要放到别的节点上重试
  • 为每个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)




TaskScheduler创建
在SparkContext初始化时创建TaskScheduler和DAGScheduler。这里具体描述一下其创建过程。
SparkContext创建过程中会调用createTaskScheduler函数来启动TaskScheduler任务调度器:
[mw_shl_code=bash,true]// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
  SparkContext.createTaskScheduler(this, master)[/mw_shl_code]

createTaskScheduler函数中,TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理.
针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:
  • Local模式:TaskSchedulerImpl + LocalBackend
  • Spark集群模式:TaskSchedulerImpl + SparkDepolySchedulerBackend
  • Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend
  • Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend
[mw_shl_code=bash,true]/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
  // Regular expression used for local[N] and local master formats
  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
  // Regular expression for local[N, maxRetries], used in tests with failing tasks
  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
  // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
  val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
  // Regular expression for connecting to Spark deploy clusters
  val SPARK_REGEX = """spark://(.*)""".r
  // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
  val MESOS_REGEX = """(mesos|zk)://.*""".r
  // Regular expression for connection to Simr cluster
  val SIMR_REGEX = """simr://(.*)""".r
  // When running locally, don't try to re-execute tasks on failure.
  val MAX_LOCAL_TASK_FAILURES = 1
  master match {
    case "local" =>
      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
      val backend = new LocalBackend(scheduler, 1)
      scheduler.initialize(backend)
      (backend, scheduler)
    case LOCAL_N_REGEX(threads) =>
      ...
    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
      ...
    case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)
    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
      // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
      val memoryPerSlaveInt = memoryPerSlave.toInt
      if (sc.executorMemory > memoryPerSlaveInt) {
        throw new SparkException(
          "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
            memoryPerSlaveInt, sc.executorMemory))
      }
      val scheduler = new TaskSchedulerImpl(sc)
      val localCluster = new LocalSparkCluster(
        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
      val masterUrls = localCluster.start()
      val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
        localCluster.stop()
      }
      (backend, scheduler)
      .....[/mw_shl_code]

以Standalone模式为例,backend根据不同的部署方式实例化,后又作为scheduler对象的一个成员变量对scheduler调用initialize函数:[mw_shl_code=bash,true]case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)[/mw_shl_code]

TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的关系
TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。
下图描述了TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的UML关系,其中TaskSchedulerImpl是task schduler的具体实现,其中混入了TaskScheduler特质,而SparkDeploySchedulerBackend等具体的资源收集类继承自CoarseGrainedSchedulerBackend这一父类,而CoarseGrainedSchedulerBackend混入了SchedulerBackend特质:

302120480557843.png

这里还是以Spark Standalone集群模式为例,分析TaskSchedulerImpl与SparkDepolySchedulerBackend类中的具体操作。
  • 资源信息收集
    SparkDepolySchedulerBackend类就是专门负责收集Worker的资源信息,在它的父类CoarseGrainedSchedulerBackend中的DriverActor就是与Worker通信的Actor。
    Worker启动后会向Driver发送RegisterExecutor消息,此消息中就包含了Executor为Application分配的计算资源信息,而接收该消息的Actor也正是DriverActor。
  • 资源分配
    TaskSchedulerImpl类就是负责为Task分配资源的。在CoarseGrainedSchedulerBackend获取到可用资源后就会通过makeOffers方法通知TaskSchedulerImpl对资源进行分配,TaskSchedulerImpl的resourceOffers方法就是负责为Task分配计算资源的,在为Task分配好资源后又会通过lauchTasks方法发送LaunchTask消息通知Worker上的Executor执行Task。


TaskScheduler创建中函数调用链
SparkContext的createTaskScheduler创建schedulerBackend和taskScheduler—>根据不同的调度方式选择具体的scheduler和backend构造器—>调用TaskSchedulerImpl的initialize方法为scheduler的成员变量backend赋值—>createTaskScheduler返回创建好的(schedulerBackend, taskScheduler)—>调用TaskScheduler.start()启动—>实际上在TaskSchedulerImpl的start方法中调用backend.start()来启动SchedulerBackend。
TaskScheduler是在Application执行过程中,为它进行任务调度的,是属于Driver侧的。对应于一个Application就会有一个TaskScheduler,TaskScheduler和Application是一一对应的。TaskScheduler对资源的控制也比较鲁棒,一个Application申请Worker的计算资源,只要Application不结束就会一直被占有。 

小结
这一篇文章,我们介绍了TaskScheduler的创建过程,TaskScheduler、TaskSchedulerImpl、SchedulerBackend之间的关系还有创建过程的调用链,给大家一个初始印象。在下一篇中,我将承接Stage划分完毕后进行task创建和分发流程,进行细致的介绍。


已有(2)人评论

跳转到指定楼层
hb1984 发表于 2015-8-16 09:59:00
谢谢楼主分享。            
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条