分享

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

本帖最后由 pig2 于 2015-1-6 14:17 编辑

问题导读
1.构成Standalone cluster部署模式的四大组成部件有哪些?分别有什么功能?
2.WorkerInfo在schedule函数中会被使用到,schedule函数处理逻辑是怎样的?













概要
本文主要讲述在standalone cluster部署模式下,Spark Application在整个运行期间,资源(主要是cpu core和内存)的申请与释放。
构成Standalone cluster部署模式的四大组成部件如下图所示,分别为Master, worker, executor和driver,它们各自运行于独立的JVM进程。

下载.png
从资源管理的角度来说
  • Master  掌管整个cluster的资源,主要是指cpu core和memory,但Master自身并不拥有这些资源
  • Worker 计算资源的实际贡献者,须向Master汇报自身拥有多少cpu core和memory, 在master的指示下负责启动executor
  • Executor 执行真正计算的苦力,由master来决定该进程拥有的core和memory数值
  • Driver 资源的实际占用者,Driver会提交一到多个job,每个job在拆分成多个task之后,会分发到各个executor真正的执行
这些内容在standalone cluster模式下的容错性分析中也有所涉及,今天主要讲一下资源在分配之后不同场景下是如何被顺利回收的。

资源上报汇聚过程
standalone cluster下最主要的当然是master,master必须先于worker和driver程序正常启动。
当master顺利启动完毕,可以开始worker的启动工作,worker在启动的时候需要向master发起注册,在注册消息中带有本worker节点的cpu core和内存。
调用顺序如下preStart->registerWithMaster->tryRegisterAllMasters
看一看tryRegisterAllMasters的代码

  1. def tryRegisterAllMasters() {
  2.     for (masterUrl <- masterUrls) {
  3.       logInfo("Connecting to master " + masterUrl + "...")
  4.       val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
  5.       actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
  6.     }
  7.   }
复制代码


我们的疑问是RegisterWorker构造函数所需的参数memory和cores是从哪里获取的呢?
注意一下Worker中的main函数会创建WorkerArguments,

  1.   def main(argStrings: Array[String]) {
  2.     SignalLogger.register(log)
  3.     val args = new WorkerArguments(argStrings)
  4.     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
  5.       args.memory, args.masters, args.workDir)
  6.     actorSystem.awaitTermination()
  7.   }
复制代码

memory通过函数inferDefaultMemory获取,而cores通过inferDefaultCores获取。
  1. def inferDefaultCores(): Int = {
  2.     Runtime.getRuntime.availableProcessors()
  3.   }
  4.   def inferDefaultMemory(): Int = {
  5.     val ibmVendor = System.getProperty("java.vendor").contains("IBM")
  6.     var totalMb = 0
  7.     try {
  8.       val bean = ManagementFactory.getOperatingSystemMXBean()
  9.       if (ibmVendor) {
  10.         val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
  11.         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
  12.         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
  13.       } else {
  14.         val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
  15.         val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
  16.         totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
  17.       }
  18.     } catch {
  19.       case e: Exception => {
  20.         totalMb = 2*1024
  21.         System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
  22.       }
  23.     }
  24.     // Leave out 1 GB for the operating system, but don't return a negative memory size
  25.     math.max(totalMb - 1024, 512)
  26.   }
复制代码

如果已经在配置文件中为显示指定了每个worker的core和memory,则使用配置文件中的值,具体配置参数为SPARK_WORKER_CORES和SPARK_WORKER_MEMORY

Master在收到RegisterWork消息之后,根据上报的信息为每一个worker创建相应的WorkerInfo.

  1. case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
  2.     {
  3.       logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
  4.         workerHost, workerPort, cores, Utils.megabytesToString(memory)))
  5.       if (state == RecoveryState.STANDBY) {
  6.         // ignore, don't send response
  7.       } else if (idToWorker.contains(id)) {
  8.         sender ! RegisterWorkerFailed("Duplicate worker ID")
  9.       } else {
  10.         val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
  11.           sender, workerUiPort, publicAddress)
  12.         if (registerWorker(worker)) {
  13.           persistenceEngine.addWorker(worker)
  14.           sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
  15.           schedule()
  16.         } else {
  17.           val workerAddress = worker.actor.path.address
  18.           logWarning("Worker registration failed. Attempted to re-register worker at same " +
  19.             "address: " + workerAddress)
  20.           sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
  21.             + workerAddress)
  22.         }
  23.       }
复制代码


资源分配过程
如果在worker注册上来的时候,已经有Driver Application注册上来,那么就需要将原先处于未分配资源状态的driver application启动相应的executor。
WorkerInfo在schedule函数中会被使用到,schedule函数处理逻辑概述如下
  • 查看目前存活的worker中剩余的内存是否能够满足application每个task的最低需求,如果是则将该worker加入到可分配资源的队列
  • 根据分发策略,如果是决定将工作平摊到每个worker,则每次在一个worker上占用一个core,直到所有可分配资源耗尽或已经满足driver的需求
  • 如果分发策略是分发到尽可能少的worker,则一次占用尽worker上的可分配core,直到driver的core需求得到满足
  • 根据步骤2或3的结果在每个worker上添加相应的executor,处理函数是addExecutor

为了叙述简单,现仅列出平摊到各个worker的分配处理过程

  1. for (worker > workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
  2.         for (app <- waitingApps if app.coresLeft > 0) {
  3.           if (canUse(app, worker)) {
  4.             val coresToUse = math.min(worker.coresFree, app.coresLeft)
  5.             if (coresToUse > 0) {
  6.               val exec = app.addExecutor(worker, coresToUse)
  7.               launchExecutor(worker, exec)
  8.               app.state = ApplicationState.RUNNING
  9.             }
  10.           }
  11.         }
  12.       }
复制代码


launchExecutor主要负责两件事情
  • 记录下新添加的executor使用掉的cpu core和内存数目,记录过程发生在worker.addExecutor
  • 向worker发送LaunchExecutor指令

  1. def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
  2.     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  3.     worker.addExecutor(exec)
  4.     worker.actor ! LaunchExecutor(masterUrl,
  5.       exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
  6.     exec.application.driver ! ExecutorAdded(
  7.       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  8.   }
复制代码


worker在收到LaunchExecutor指令后,也会记一笔账,将要使用掉的cpu core和memory从可用资源中减去,然后使用ExecutorRunner来负责生成Executor进程,注意Executor运行于独立的进程。代码如下

  1. case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  2.       if (masterUrl != activeMasterUrl) {
  3.         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  4.       } else {
  5.         try {
  6.           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
  7.           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
  8.             self, workerId, host,
  9.             appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
  10.             workDir, akkaUrl, conf, ExecutorState.RUNNING)
  11.           executors(appId + "/" + execId) = manager
  12.           manager.start()
  13.           coresUsed += cores_
  14.           memoryUsed += memory_
  15.           masterLock.synchronized {
  16.             master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
  17.           }
  18.         } catch {
  19.           case e: Exception => {
  20.             logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
  21.             if (executors.contains(appId + "/" + execId)) {
  22.               executors(appId + "/" + execId).kill()
  23.               executors -= appId + "/" + execId
  24.             }
  25.             masterLock.synchronized {
  26.               master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
  27.             }
  28.           }
  29.         }
  30.       }
复制代码

在资源分配过程中需要注意到的是如果有多个Driver Application处于等待状态,资源分配的原则是FIFO,先到先得。

资源回收过程
worker中上报的资源最终被driver application中提交的job task所占用,如果application结束(包括正常和异常退出),application所占用的资源就应该被顺利回收,即将占用的资源重新归入可分配资源行列。
现在的问题转换成Master和Executor如何知道Driver Application已经退出了呢?
有两种不同的处理方式,一种是先道别后离开,一种是不告而别。现分别阐述。
何为先道别后离开,即driver application显式的通知master和executor,任务已经完成了,我要bye了。应用程序显式的调用SparkContext.stop

  1. def stop() {
  2.     postApplicationEnd()
  3.     ui.stop()
  4.     // Do this only if not stopped already - best case effort.
  5.     // prevent NPE if stopped more than once.
  6.     val dagSchedulerCopy = dagScheduler
  7.     dagScheduler = null
  8.     if (dagSchedulerCopy != null) {
  9.       metadataCleaner.cancel()
  10.       cleaner.foreach(_.stop())
  11.       dagSchedulerCopy.stop()
  12.       taskScheduler = null
  13.       // TODO: Cache.stop()?
  14.       env.stop()
  15.       SparkEnv.set(null)
  16.       ShuffleMapTask.clearCache()
  17.       ResultTask.clearCache()
  18.       listenerBus.stop()
  19.       eventLogger.foreach(_.stop())
  20.       logInfo("Successfully stopped SparkContext")
  21.     } else {
  22.       logInfo("SparkContext already stopped")
  23.     }
  24.   }
复制代码

显式调用SparkContext.stop的一个主要功能是会去显式的停止Executor,具体下达StopExecutor指令的代码见于CoarseGrainedSchedulerBackend中的stop函数
  1.   override def stop() {
  2.     stopExecutors()
  3.     try {
  4.       if (driverActor != null) {
  5.         val future = driverActor.ask(StopDriver)(timeout)
  6.         Await.ready(future, timeout)
  7.       }
  8.     } catch {
  9.       case e: Exception =>
  10.         throw new SparkException("Error stopping standalone scheduler's driver actor", e)
  11.     }
  12.   }
复制代码

那么Master又是如何知道Driver Application退出的呢?这要归功于Akka的通讯机制了,当相互通讯的任意一方异常退出,另一方都会收到DisassociatedEvent, Master也就是在这个消息处理中移除已经停止的Driver Application。
  1. case DisassociatedEvent(_, address, _) => {
  2.       // The disconnected client could've been either a worker or an app; remove whichever it was
  3.       logInfo(s"$address got disassociated, removing it.")
  4.       addressToWorker.get(address).foreach(removeWorker)
  5.       addressToApp.get(address).foreach(finishApplication)
  6.       if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
  7.     }
复制代码


不告而别的方式下Executor是如何知道自己所服务的application已经顺利完成使命了呢?道理和master的一样,还是通过DisassociatedEvent来感知。详见CoarseGrainedExecutorBackend中的receive函数

  1.   case x: DisassociatedEvent =>
  2.       logError(s"Driver $x disassociated! Shutting down.")
  3.       System.exit(1)
复制代码


异常情况下的资源回收
由于Master和Worker之间的心跳机制,如果worker异常退出, Master会由心跳机制感知到其消亡,进而将其上报的资源移除。
Executor异常退出时,Worker中的监控线程ExecutorRunner会立即感知,进而上报给Master,Master会回收资源,并重新要求worker启动executor。


相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码


Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现



欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(3)人评论

跳转到指定楼层
落魂草 发表于 2015-1-3 20:09:31
回复

使用道具 举报

fzleejm 发表于 2015-1-4 09:32:35
学习。。            
回复

使用道具 举报

355815741 发表于 2015-1-4 10:03:06
学习了,谢谢分享~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条