分享

Spark之任务调度

问题导读
1、RDD的依赖关系是什么?
2、spark和MapReduce的设计理念区别是什么?






概述
Spark Application在遇到action算子时,SparkContext会生成Job,并将构成DAG图将给DAG Scheduler解析成Stage。

Stage有两种:
ShuffleMapStage
这种Stage是以Shuffle为输出边界
其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出
其输出可以是另一个Stage的开始
ShuffleMapStage的最后Task就是ShuffleMapTask
在一个Job里可能有该类型的Stage,也可以能没有该类型Stage。
ResultStage
这种Stage是直接输出结果
其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出
ResultStage的最后Task就是ResultTask
在一个Job里必定有该类型Stage。
一个Job含有一个或多个Stage,但至少含有一个ResultStage。

Scheduler模块整体架构
scheduler 模块主要分为两大部分:

TaskSchedulerListener。TaskSchedulerListener部分的主要功能是监听用户提交的job,将job分解为不同的类型的stage以及相应的task,并向TaskScheduler提交task。
TaskScheduler。TaskScheduler 接收用户提交的task并执行。而TaskScheduler根据部署的不同又分为三个子模块:
ClusterScheduler
LocalScheduler
MesosScheduler

TaskSchedulerListener
Spark抽象了 TaskSchedulerListener 并在其上实现了 DAGScheduler 。DAGScheduler 的主要功能是接收用户提交的job,将job根据类型划分为不同的stage,并在每一个stage内产生一系列的task,向 TaskScheduler 提交task。下面我们首先来看一下 TaskSchedulerListener 部分的类图:
1.png


用户所提交的job在得到 DAGScheduler 的调度后,会被包装成 ActiveJob,同时会启动 JobWaiter 阻塞监听job的完成状况。
于此同时依据job中 RDD 的dependency和dependency属性(NarrowDependency , ShufflerDependecy ), DAGScheduler 会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。
在每一个stage内部,根据stage产生出相应的task,包括 ResultTask 或是ShuffleMapTask ,这些task会根据 RDD 中partition的数量和分布,产生出一组相应的task,并将其包装为 TaskSet 提交到 TaskScheduler 上去。

RDD的依赖关系和Stage的分类
在Spark中,每一个 RDD 是对于数据集在某一状态下的表现形式,而这个状态有可能是从前一状态转换而来的,因此换句话说这一个 RDD 有可能与之前的RDD(s) 有依赖关系。根据依赖关系的不同,可以将 RDD 分成两种不同的类型: Narrow Dependency 和 Wide Dependency 。

Narrow Dependency 指的是  child RDD 只依赖于 parent RDD(s) 固定数量的partition。
Wide Dependency 指的是 child RDD 的每一个partition都依赖于parent RDD(s) 所有partition。
它们之间的区别可参看下图:
1.png
根据 RDD 依赖关系的不同,Spark也将每一个job分为不同的stage,而stage之间的依赖关系则形成了DAG。对于 Narrow Dependency ,Spark会尽量多地将 RDD 转换放在同一个stage中;而对于 Wide Dependency ,由于Wide Dependency 通常意味着shuffle操作,因此Spark会将此stage定义ShuffleMapStage ,以便于向 MapOutputTracker 注册shuffle操作。对于stage的划分可参看下图,Spark通常将shuffle操作定义为stage的边界。
1.png


DAGScheduler
在用户创建 SparkContext 对象时,Spark会在内部创建 DAGScheduler 对象,并根据用户的部署情况,绑定不同的 TaskSechduler ,并启动DAGcheduler
  1. private var taskScheduler: TaskScheduler = {
  2.     //...
  3. }
  4. taskScheduler.start()
  5. private var dagScheduler = new DAGScheduler(taskScheduler)
  6. dagScheduler.start()
复制代码


而 DAGScheduler 的启动会在内部创建daemon线程,daemon线程调用run() 从block queue中取出event进行处理。
  1. private def run() {
  2.   SparkEnv.set(env)
  3.   while (true) {
  4.     val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
  5.     if (event != null) {
  6.       logDebug("Got event of type " + event.getClass.getName)
  7.     }
  8.     if (event != null) {
  9.       if (processEvent(event)) {
  10.         return
  11.       }
  12.     }
  13.     val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
  14.     if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
  15.       resubmitFailedStages()
  16.     } else {
  17.       submitWaitingStages()
  18.     }
  19.   }
  20. }
复制代码


而 run() 会调用 processEvent 来处理不同的event。
DAGScheduler 处理的event包括:
JobSubmitted
CompletionEvent
ExecutorLost
TaskFailed
StopDAGScheduler
根据event的不同调用不同的方法去处理。

本质上 DAGScheduler 是一个生产者-消费者模型,用户和 TaskSchduler 产生event将其放入block queue,daemon线程消费event并处理相应事件。

Job的生与死
既然用户提交的job最终会交由 DAGScheduler 去处理,那么我们就来研究一下DAGScheduler 处理job的整个流程。在这里我们分析两种不同类型的job的处理流程。

1.没有shuffle和reduce的job
  1. val textFile = sc.textFile("README.md")
  2. textFile.filter(line => line.contains("Spark")).count()
复制代码


2.有shuffle和reduce的job
  1. val textFile = sc.textFile("README.md")
  2. textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
复制代码


首先在对 RDD 的 count() 和 reduceByKey() 操作都会调用SparkContext 的 runJob() 来提交job,而 SparkContext 的 runJob() 最终会调用 DAGScheduler 的 runJob() :

  1. def runJob[T, U: ClassManifest](
  2.     finalRdd: RDD[T],
  3.     func: (TaskContext, Iterator[T]) => U,
  4.     partitions: Seq[Int],
  5.     callSite: String,
  6.     allowLocal: Boolean,
  7.     resultHandler: (Int, U) => Unit)
  8. {
  9.   if (partitions.size == 0) {
  10.     return
  11.   }
  12.   val (toSubmit, waiter) = prepareJob(
  13.       finalRdd, func, partitions, callSite, allowLocal, resultHandler)
  14.   eventQueue.put(toSubmit)
  15.   waiter.awaitResult() match {
  16.     case JobSucceeded => {}
  17.     case JobFailed(exception: Exception) =>
  18.       logInfo("Failed to run " + callSite)
  19.       throw exception
  20.   }
  21. }
复制代码


runJob() 会调用 prepareJob() 对job进行预处理,封装成 JobSubmitted事件,放入queue中,并阻塞等待job完成。

当daemon线程的 processEvent() 从queue中取出 JobSubmitted 事件后,会根据job划分出不同的stage,并且提交stage:
  1. case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
  2.   val runId = nextRunId.getAndIncrement()
  3.   val finalStage = newStage(finalRDD, None, runId)
  4.   val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
  5.   clearCacheLocs()
  6.   if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
  7.     runLocally(job)
  8.   } else {
  9.     activeJobs += job
  10.     resultStageToJob(finalStage) = job
  11.     submitStage(finalStage)
  12.   }
复制代码


首先,对于任何的job都会产生出一个 finalStage 来产生和提交task。其次对于某些简单的job,它没有依赖关系,并且只有一个partition,这样的job会使用local thread处理而并非提交到 TaskScheduler 上处理。

接下来产生 finalStage 后,需要调用 submitStage() ,它根据stage之间的依赖关系得出stage DAG,并以依赖关系进行处理:
  1. private def submitStage(stage: Stage) {
  2.   if (!waiting(stage) && !running(stage) && !failed(stage)) {
  3.     val missing = getMissingParentStages(stage).sortBy(_.id)
  4.     if (missing == Nil) {
  5.       submitMissingTasks(stage)
  6.       running += stage
  7.     } else {
  8.       for (parent <- missing) {
  9.         submitStage(parent)
  10.       }
  11.       waiting += stage
  12.     }
  13.   }
  14. }
复制代码


对于新提交的job, finalStage 的parent stage还未获得,因此submitStage 会调用 getMissingParentStages() 来获得依赖关系:
  1. private def getMissingParentStages(stage: Stage): List[Stage] = {
  2.   val missing = new HashSet[Stage]
  3.   val visited = new HashSet[RDD[_]]
  4.   def visit(rdd: RDD[_]) {
  5.     if (!visited(rdd)) {
  6.       visited += rdd
  7.       if (getCacheLocs(rdd).contains(Nil)) {
  8.         for (dep <- rdd.dependencies) {
  9.           dep match {
  10.             case shufDep: ShuffleDependency[_,_] =>
  11.               val mapStage = getShuffleMapStage(shufDep, stage.priority)
  12.               if (!mapStage.isAvailable) {
  13.                 missing += mapStage
  14.               }
  15.             case narrowDep: NarrowDependency[_] =>
  16.               visit(narrowDep.rdd)
  17.           }
  18.         }
  19.       }
  20.     }
  21.   }
  22.   visit(stage.rdd)
  23.   missing.toList
  24. }
复制代码


这里parent stage是通过 RDD 的依赖关系递归遍历获得。对于Wide Dependecy 也就是 Shuffle Dependecy ,Spark会产生新的 mapStage作为 finalStage 的parent,而对于 Narrow Dependecy  Spark则不会产生新的stage。这里对stage的划分是按照上面提到的作为划分依据的,因此对于本段开头提到的两种job,第一种job只会产生一个 finalStage ,而第二种job会产生finalStage 和 mapStage 。

当stage DAG产生以后,针对每个stage需要产生task去执行,故在这会调用submitMissingTasks() :
  1. private def submitMissingTasks(stage: Stage) {
  2.   val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
  3.   myPending.clear()
  4.   var tasks = ArrayBuffer[Task[_]]()
  5.   if (stage.isShuffleMap) {
  6.     for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
  7.       val locs = getPreferredLocs(stage.rdd, p)
  8.       tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
  9.     }
  10.   } else {
  11.     val job = resultStageToJob(stage)
  12.     for (id <- 0 until job.numPartitions if (!job.finished(id))) {
  13.       val partition = job.partitions(id)
  14.       val locs = getPreferredLocs(stage.rdd, partition)
  15.       tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
  16.     }
  17.   }
  18.   if (tasks.size > 0) {
  19.     myPending ++= tasks
  20.     taskSched.submitTasks(
  21.       new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
  22.     if (!stage.submissionTime.isDefined) {
  23.       stage.submissionTime = Some(System.currentTimeMillis())
  24.     }
  25.   } else {
  26.     running -= stage
  27.   }
  28. }
复制代码


首先根据stage所依赖的 RDD 的partition的分布,会产生出与partition数量相等的task,这些task根据partition的locality进行分布;其次对于 finalStage 或是mapStage 会产生不同的task;最后所有的task会封装到 TaskSet 内提交到TaskScheduler 去执行。

至此job在 DAGScheduler 内的启动过程全部完成,交由 TaskScheduler 执行task,当task执行完后会将结果返回给 DAGScheduler , DAGScheduler 调用handleTaskComplete() 处理task返回:
  1. private def handleTaskCompletion(event: CompletionEvent) {
  2.   val task = event.task
  3.   val stage = idToStage(task.stageId)
  4.   def markStageAsFinished(stage: Stage) = {
  5.     val serviceTime = stage.submissionTime match {
  6.       case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
  7.       case _ => "Unkown"
  8.     }
  9.     logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
  10.     running -= stage
  11.   }
  12.   event.reason match {
  13.     case Success =>
  14.         ...
  15.       task match {
  16.         case rt: ResultTask[_, _] =>
  17.           ...
  18.         case smt: ShuffleMapTask =>
  19.           ...
  20.       }
  21.     case Resubmitted =>
  22.       ...
  23.     case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
  24.       ...
  25.     case other =>
  26.       abortStage(idToStage(task.stageId), task + " failed: " + other)
  27.   }
  28. }
复制代码

每个执行完成的task都会将结果返回给 DAGScheduler , DAGScheduler 根据返回结果来进行进一步的动作。

RDD的计算
RDD 的计算是在task中完成的。我们之前提到task分为 ResultTask 和ShuffleMapTask ,我们分别来看一下这两种task具体的执行过程。

ResultTask
  1. override def run(attemptId: Long): U = {
  2.     val context = new TaskContext(stageId, partition, attemptId)
  3.     try {
  4.       func(context, rdd.iterator(split, context))
  5.     } finally {
  6.       context.executeOnCompleteCallbacks()
  7.     }
  8.   }
复制代码


ShuffleMapTask
  1. override def run(attemptId: Long): MapStatus = {
  2.     val numOutputSplits = dep.partitioner.numPartitions
  3.     val taskContext = new TaskContext(stageId, partition, attemptId)
  4.     try {
  5.       val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
  6.       for (elem <- rdd.iterator(split, taskContext)) {
  7.         val pair = elem.asInstanceOf[(Any, Any)]
  8.         val bucketId = dep.partitioner.getPartition(pair._1)
  9.         buckets(bucketId) += pair
  10.       }
  11.       val compressedSizes = new Array[Byte](numOutputSplits)
  12.       val blockManager = SparkEnv.get.blockManager
  13.       for (i <- 0 until numOutputSplits) {
  14.         val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
  15.         val iter: Iterator[(Any, Any)] = buckets(i).iterator
  16.         val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
  17.         compressedSizes(i) = MapOutputTracker.compressSize(size)
  18.       }
  19.       return new MapStatus(blockManager.blockManagerId, compressedSizes)
  20.     } finally {
  21.       taskContext.executeOnCompleteCallbacks()
  22.     }
  23.   }
复制代码


ResultTask 和 ShuffleMapTask 都会调用 RDD 的 iterator() 来计算和转换 RDD ,不同的是: ResultTask 转换完 RDD 后调用 func() 计算结果;而 ShufflerMapTask 则将其放入 blockManager 中用来shuffle。

RDD 的计算调用 iterator() , iterator() 在内部调用 compute() 从RDD 依赖关系的根开始计算:
  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2.   if (storageLevel != StorageLevel.NONE) {
  3.     SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4.   } else {
  5.     computeOrReadCheckpoint(split, context)
  6.   }
  7. }
  8. private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
  9.   if (isCheckpointed) {
  10.     firstParent[T].iterator(split, context)
  11.   } else {
  12.     compute(split, context)
  13.   }
  14. }
复制代码

至此大致分析了 TaskSchedulerListener ,包括 DAGScheduler 内部的结构,job生命周期内的活动, RDD 是何时何地计算的。接下来我们分析一下task在TaskScheduler 内干了什么。

TaskScheduler

前面也提到了Spark实现了三种不同的 TaskScheduler ,包括LocalSheduler 、 ClusterScheduler 和 MesosScheduler 。LocalSheduler 是一个在本地执行的线程池, DAGScheduler 提交的所有task会在线程池中被执行,并将结果返回给 DAGScheduler 。 MesosScheduler 依赖于Mesos进行调度,笔者对Mesos了解甚少,因此不做分析。故此章节主要分析ClusterScheduler 模块。

ClusterScheduler 模块与deploy模块和executor模块耦合较为紧密,因此在分析 ClUsterScheduler 时也会顺带介绍deploy和executor模块。

首先我们来看一下 ClusterScheduler 的类图:
1.png

ClusterScheduler 的启动会伴随 SparkDeploySchedulerBackend 的启动,而backend会将自己分为两个角色:首先是driver,driver是一个local运行的actor,负责与remote的executor进行通行,提交任务,控制executor;其次是StandaloneExecutorBackend ,Spark会在每一个slave node上启动一个StandaloneExecutorBackend 进程,负责执行任务,返回执行结果。

ClusterScheduler的启动
在 SparkContext 实例化的过程中, ClusterScheduler 被随之实例化,同时赋予其 SparkDeploySchedulerBackend :
  1.   master match {
  2.       ...
  3.     case SPARK_REGEX(sparkUrl) =>
  4.       val scheduler = new ClusterScheduler(this)
  5.       val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
  6.       scheduler.initialize(backend)
  7.       scheduler
  8.     case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
  9.       ...
  10.     case _ =>
  11.       ...
  12.   }
  13. }
  14. taskScheduler.start()
复制代码


ClusterScheduler 的启动会启动 SparkDeploySchedulerBackend ,同时启动daemon进程来检查speculative task:
  1. override def start() {
  2.   backend.start()
  3.   if (System.getProperty("spark.speculation", "false") == "true") {
  4.     new Thread("ClusterScheduler speculation check") {
  5.       setDaemon(true)
  6.       override def run() {
  7.         while (true) {
  8.           try {
  9.             Thread.sleep(SPECULATION_INTERVAL)
  10.           } catch {
  11.             case e: InterruptedException => {}
  12.           }
  13.           checkSpeculatableTasks()
  14.         }
  15.       }
  16.     }.start()
  17.   }
  18. }
复制代码


SparkDeploySchedulerBacked 的启动首先会调用父类的 start() ,接着它会启动client,并由client连接到master向每一个node的worker发送请求启动StandaloneExecutorBackend 。这里的client、master、worker涉及到了deploy模块,暂时不做具体介绍。而 StandaloneExecutorBackend 则涉及到了executor模块,它主要的功能是在每一个node创建task可以运行的环境,并让task在其环境中运行。
  1. override def start() {
  2.   super.start()
  3.   val driverUrl = "akka://spark@%s:%s/user/%s".format(
  4.     System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
  5.     StandaloneSchedulerBackend.ACTOR_NAME)
  6.   val args = Seq(driverUrl, "", "", "")
  7.   val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
  8.   val sparkHome = sc.getSparkHome().getOrElse(
  9.     throw new IllegalArgumentException("must supply spark home for spark standalone"))
  10.   val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)
  11.   client = new Client(sc.env.actorSystem, master, appDesc, this)
  12.   client.start()
  13. }
复制代码


在 StandaloneSchedulerBackend 中会创建 DriverActor ,它就是local的driver,以actor的方式与remote的executor进行通信。
  1. override def start() {
  2.   val properties = new ArrayBuffer[(String, String)]
  3.   val iterator = System.getProperties.entrySet.iterator
  4.   while (iterator.hasNext) {
  5.     val entry = iterator.next
  6.     val (key, value) = (entry.getKey.toString, entry.getValue.toString)
  7.     if (key.startsWith("spark.")) {
  8.       properties += ((key, value))
  9.     }
  10.   }
  11.   driverActor = actorSystem.actorOf(
  12.     Props(new DriverActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
  13. }
复制代码


在client实例化之前,会将 StandaloneExecutorBackend 的启动环境作为参数传递给client,而client启动时会将此提交给master,由master分发给所有node上的worker,worker会配置环境并创建进程启动 StandaloneExecutorBackend 。

至此 ClusterScheduler 的启动,local driver的创建,remote executor环境的启动所有过程都已结束, ClusterScheduler 等待 DAGScheduler 提交任务。

ClusterScheduler提交任务
DAGScheduler 会调用 ClusterScheduler 提交任务,任务会被包装成TaskSetManager 并等待调度:
  1. override def submitTasks(taskSet: TaskSet) {
  2.   val tasks = taskSet.tasks
  3.   logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  4.   this.synchronized {
  5.     val manager = new TaskSetManager(this, taskSet)
  6.     activeTaskSets(taskSet.id) = manager
  7.     activeTaskSetsQueue += manager
  8.     taskSetTaskIds(taskSet.id) = new HashSet[Long]()
  9.     if (hasReceivedTask == false) {
  10.       starvationTimer.scheduleAtFixedRate(new TimerTask() {
  11.         override def run() {
  12.           if (!hasLaunchedTask) {
  13.             logWarning("Initial job has not accepted any resources; " +
  14.               "check your cluster UI to ensure that workers are registered")
  15.           } else {
  16.             this.cancel()
  17.           }
  18.         }
  19.       }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
  20.     }
  21.     hasReceivedTask = true;
  22.   }
  23.   backend.reviveOffers()
  24. }
复制代码


在任务提交的同时会启动定时器,如果任务还未被执行,定时器持续发出警告直到任务被执行。同时会调用 StandaloneSchedulerBackend 的reviveOffers() ,而它则会通过actor向driver发送 ReviveOffers ,driver收到 ReviveOffers 后调用 makeOffers() :
  1. // Make fake resource offers on just one executor
  2. def makeOffers(executorId: String) {
  3.   launchTasks(scheduler.resourceOffers(
  4.     Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
  5. }
  6. // Launch tasks returned by a set of resource offers
  7. def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  8.   for (task <- tasks.flatten) {
  9.     freeCores(task.executorId) -= 1
  10.     executorActor(task.executorId) ! LaunchTask(task)
  11.   }
  12. }
复制代码

makeOffers() 会向 ClusterScheduler 申请资源,并向executor提交LauchTask 请求。

接下来 LaunchTask 会进入executor模块, StandaloneExecutorBackend在收到 LaunchTask 请求后会调用 Executor 执行task:
  1. override def receive = {
  2.   case RegisteredExecutor(sparkProperties) =>
  3.     ...  
  4.   case RegisterExecutorFailed(message) =>
  5.     ...
  6.   case LaunchTask(taskDesc) =>
  7.     logInfo("Got assigned task " + taskDesc.taskId)
  8.     executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
  9.   case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
  10.     ...
  11. }
  12. def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
  13.   threadPool.execute(new TaskRunner(context, taskId, serializedTask))
  14. }
复制代码


Executor 内部是一个线程池,每一个提交的task都会包装为 TaskRunner 交由threadpool执行:
  1. class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
  2.   extends Runnable {
  3.   override def run() {
  4.     SparkEnv.set(env)
  5.     Thread.currentThread.setContextClassLoader(urlClassLoader)
  6.     val ser = SparkEnv.get.closureSerializer.newInstance()
  7.     logInfo("Running task ID " + taskId)
  8.     context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
  9.     try {
  10.       SparkEnv.set(env)
  11.       Accumulators.clear()
  12.       val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
  13.       updateDependencies(taskFiles, taskJars)
  14.       val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
  15.       logInfo("Its generation is " + task.generation)
  16.       env.mapOutputTracker.updateGeneration(task.generation)
  17.       val value = task.run(taskId.toInt)
  18.       val accumUpdates = Accumulators.values
  19.       val result = new TaskResult(value, accumUpdates)
  20.       val serializedResult = ser.serialize(result)
  21.       logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
  22.       context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  23.       logInfo("Finished task ID " + taskId)
  24.     } catch {
  25.       case ffe: FetchFailedException => {
  26.         val reason = ffe.toTaskEndReason
  27.         context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
  28.       }
  29.       case t: Throwable => {
  30.         val reason = ExceptionFailure(t)
  31.         context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
  32.         // TODO: Should we exit the whole executor here? On the one hand, the failed task may
  33.         // have left some weird state around depending on when the exception was thrown, but on
  34.         // the other hand, maybe we could detect that when future tasks fail and exit then.
  35.         logError("Exception in task ID " + taskId, t)
  36.         //System.exit(1)
  37.       }
  38.     }
  39.   }
  40. }
复制代码

其中 task.run() 则真正执行了task中的任务,如前 RDD的计算 章节所述。返回值被包装成 TaskResult 返回。

至此task在 ClusterScheduler 内运行的流程有了一个大致的介绍,当然这里略掉了许多异常处理的分支,但这不影响我们对主线的了解。

END
至此对Spark的Scheduler模块的主线做了一个顺藤摸瓜式的介绍,Scheduler模块作为Spark最核心的模块之一,充分体现了Spark与MapReduce的不同之处,体现了Spark DAG思想的精巧和设计的优雅。

当然Spark的代码仍然在积极开发之中,当前的源码分析在过不久后可能会变得没有意义,但重要的是体会Spark区别于MapReduce的设计理念,以及DAG思想的应用。DAG作为对MapReduce框架的改进越来越受到大数据界的重视,hortonworks 也提出了类似DAG的框架 tez 作为对MapReduce的改进。

已有(10)人评论

跳转到指定楼层
雪夜归人 发表于 2015-10-29 09:21:49
这么好的帖子,顶一下
回复

使用道具 举报

buddhist 发表于 2015-10-31 09:15:30
楼主,问一下子,本帖中对应的是哪个版本的spark源码呢?
回复

使用道具 举报

freshru 发表于 2016-5-30 17:38:42
Job的生与死
既然用户提交的job最终会交由 DAGScheduler 去处理,那么我们就来研究一下DAGScheduler 处理job的整个流程。在这里我们分析两种不同类型的job的处理流程。

1.没有shuffle和reduce的job
val textFile = sc.textFile("README.md")
textFile.filter(line => line.contains("Spark")).count()
复制代码


2.有shuffle和reduce的job
val textFile = sc.textFile("README.md")
textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
复制代码


首先在对 RDD 的 count() 和 reduceByKey() 操作都会调用SparkContext 的 runJob() 来提交job,而 SparkContext 的 runJob() 最终会调用 DAGScheduler 的 runJob() :




reduceByKey为transformation操作,textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)执行完了之后应该还没有生成job,所以会有提交job吗?
回复

使用道具 举报

aboutSunFlower 发表于 2016-6-28 20:28:31
又学到了,楼主分析很详细哦,根据这个,我觉得我的sparkstreaming应该是finalStage,没有进行shuffle
回复

使用道具 举报

aboutSunFlower 发表于 2016-6-28 20:30:51
不过有一点没太搞明白,JobWaiter是在监控job的执行情况,如果jobFailed的话是不是会重新执行一遍?spark会保证所有的job都执行成功吗?
回复

使用道具 举报

aboutSunFlower 发表于 2016-6-28 20:32:23
aboutSunFlower 发表于 2016-6-28 20:30
不过有一点没太搞明白,JobWaiter是在监控job的执行情况,如果jobFailed的话是不是会重新执行一遍?spark会 ...

个人理解spark应该是有保证所有的job都执行成功的机制,比如结合checkpoint什么的
回复

使用道具 举报

aboutSunFlower 发表于 2016-6-28 21:22:43
认真学习了4遍
回复

使用道具 举报

deajosha 发表于 2016-8-10 08:30:38
mark  刚开始学习spark 感觉很强大
回复

使用道具 举报

w517424787 发表于 2016-12-28 09:58:31
freshru 发表于 2016-5-30 17:38
**** 作者被禁止或删除 内容自动屏蔽 ****

肯定是不会提交job的,必须要有Action,一般会是collect,foreach等来输出数据!
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条