分享

Spark源码系列(七)Spark on yarn具体实现

xioaxu790 2014-9-13 18:27:56 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 25124
本帖最后由 pig2 于 2014-9-14 12:40 编辑
问题导读
1、yarn提交作业的流程是怎样的?
2、run方法在ApplicationMaster里面主要干了什么工作?
3、把作业发布到yarn上面去执行,涉及到哪些类?





本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思。这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码。

在第一章《spark-submit提交作业过程》的时候,我们讲过Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,这个就是我们的头号目标。

提交作业
找到main函数,里面调用了run方法,我们直接看run方法。
  1.     val appId = runApp()
  2.     monitorApplication(appId)
  3.     System.exit(0)
复制代码



运行App,跟踪App,最后退出。我们先看runApp吧。
  1.   def runApp(): ApplicationId = {
  2.     // 校验参数,内存不能小于384Mb,Executor的数量不能少于1个。
  3.     validateArgs()
  4.     // 这两个是父类的方法,初始化并且启动Client
  5.     init(yarnConf)
  6.     start()
  7.     // 记录集群的信息(e.g, NodeManagers的数量,队列的信息).
  8.     logClusterResourceDetails()
  9.     // 准备提交请求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application.
  10.     val newApp = super.createApplication()
  11.     val newAppResponse = newApp.getNewApplicationResponse()
  12.     val appId = newAppResponse.getApplicationId()
  13.     // 检查集群的内存是否满足当前的作业需求
  14.     verifyClusterResources(newAppResponse)
  15.     // 准备资源和环境变量.
  16.     //1.获得工作目录的具体地址: /.sparkStaging/appId/
  17.     val appStagingDir = getAppStagingDir(appId)
  18.   //2.创建工作目录,设置工作目录权限,上传运行时所需要的jar包
  19.     val localResources = prepareLocalResources(appStagingDir)
  20.     //3.设置运行时需要的环境变量
  21.     val launchEnv = setupLaunchEnv(localResources, appStagingDir)
  22.   //4.设置运行时JVM参数,设置SPARK_USE_CONC_INCR_GC为true的话,就使用CMS的垃圾回收机制
  23.     val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv)
  24.     // 设置application submission context.
  25.     val appContext = newApp.getApplicationSubmissionContext()
  26.     appContext.setApplicationName(args.appName)
  27.     appContext.setQueue(args.amQueue)
  28.     appContext.setAMContainerSpec(amContainer)
  29.     appContext.setApplicationType("SPARK")
  30.     // 设置ApplicationMaster的内存,Resource是表示资源的类,目前有CPU和内存两种.
  31.     val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
  32.     memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
  33.     appContext.setResource(memoryResource)
  34.     // 提交Application.
  35.     submitApp(appContext)
  36.     appId
  37.   }
复制代码


monitorApplication就不说了,不停的调用getApplicationReport方法获得最新的Report,然后调用getYarnApplicationState获取当前状态,如果状态为FINISHED、FAILED、KILLED就退出。

说到这里,顺便把跟yarn相关的参数也贴出来一下,大家一看就清楚了。
  1.     while (!args.isEmpty) {
  2.       args match {
  3.         case ("--jar") :: value :: tail =>
  4.           userJar = value
  5.           args = tail
  6.         case ("--class") :: value :: tail =>
  7.           userClass = value
  8.           args = tail
  9.         case ("--args" | "--arg") :: value :: tail =>
  10.           if (args(0) == "--args") {
  11.             println("--args is deprecated. Use --arg instead.")
  12.           }
  13.           userArgsBuffer += value
  14.           args = tail
  15.         case ("--master-class" | "--am-class") :: value :: tail =>
  16.           if (args(0) == "--master-class") {
  17.             println("--master-class is deprecated. Use --am-class instead.")
  18.           }
  19.           amClass = value
  20.           args = tail
  21.         case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail =>
  22.           if (args(0) == "--master-memory") {
  23.             println("--master-memory is deprecated. Use --driver-memory instead.")
  24.           }
  25.           amMemory = value
  26.           args = tail
  27.         case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
  28.           if (args(0) == "--num-workers") {
  29.             println("--num-workers is deprecated. Use --num-executors instead.")
  30.           }
  31.           numExecutors = value
  32.           args = tail
  33.         case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
  34.           if (args(0) == "--worker-memory") {
  35.             println("--worker-memory is deprecated. Use --executor-memory instead.")
  36.           }
  37.           executorMemory = value
  38.           args = tail
  39.         case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
  40.           if (args(0) == "--worker-cores") {
  41.             println("--worker-cores is deprecated. Use --executor-cores instead.")
  42.           }
  43.           executorCores = value
  44.           args = tail
  45.         case ("--queue") :: value :: tail =>
  46.           amQueue = value
  47.           args = tail
  48.         case ("--name") :: value :: tail =>
  49.           appName = value
  50.           args = tail
  51.         case ("--addJars") :: value :: tail =>
  52.           addJars = value
  53.           args = tail
  54.         case ("--files") :: value :: tail =>
  55.           files = value
  56.           args = tail
  57.         case ("--archives") :: value :: tail =>
  58.           archives = value
  59.           args = tail
  60.         case Nil =>
  61.           if (userClass == null) {
  62.             printUsageAndExit(1)
  63.           }
  64.         case _ =>
  65.           printUsageAndExit(1, args)
  66.       }
  67.     }
复制代码



ApplicationMaster
直接看run方法就可以了,main函数就干了那么一件事...
  1.   def run() {
  2.     // 设置本地目录,默认是先使用yarn的YARN_LOCAL_DIRS目录,再到LOCAL_DIRS
  3.     System.setProperty("spark.local.dir", getLocalDirs())
  4.     // set the web ui port to be ephemeral for yarn so we don't conflict with
  5.     // other spark processes running on the same box
  6.     System.setProperty("spark.ui.port", "0")
  7.     // when running the AM, the Spark master is always "yarn-cluster"
  8.     System.setProperty("spark.master", "yarn-cluster")
  9.   // 设置优先级为30,和mapreduce的优先级一样。它比HDFS的优先级高,因为它的操作是清理该作业在hdfs上面的Staging目录
  10.     ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
  11.     appAttemptId = getApplicationAttemptId()
  12.   // 通过yarn.resourcemanager.am.max-attempts来设置,默认是2
  13.   // 目前发现它只在清理Staging目录的时候用
  14.     isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
  15.     amClient = AMRMClient.createAMRMClient()
  16.     amClient.init(yarnConf)
  17.     amClient.start()
  18.     // setup AmIpFilter for the SparkUI - do this before we start the UI
  19.   //  方法的介绍说是yarn用来保护ui界面的,我感觉是设置ip代理的
  20.     addAmIpFilter()
  21.   //  注册ApplicationMaster到内部的列表里
  22.     ApplicationMaster.register(this)
  23.     // 安全认证相关的东西,默认是不开启的,省得给自己找事
  24.     val securityMgr = new SecurityManager(sparkConf)
  25.     // 启动driver程序
  26.     userThread = startUserClass()
  27.     // 等待SparkContext被实例化,主要是等待spark.driver.port property被使用
  28.   // 等待结束之后,实例化一个YarnAllocationHandler
  29.     waitForSparkContextInitialized()
  30.     // Do this after Spark master is up and SparkContext is created so that we can register UI Url.
  31.   // 向yarn注册当前的ApplicationMaster, 这个时候isFinished不能为true,是true就说明程序失败了
  32.     synchronized {
  33.       if (!isFinished) {
  34.         registerApplicationMaster()
  35.         registered = true
  36.       }
  37.     }
  38.     // 申请Container来启动Executor
  39.     allocateExecutors()
  40.     // 等待程序运行结束
  41.     userThread.join()
  42.     System.exit(0)
  43.   }
复制代码



run方法里面主要干了5项工作:
1、初始化工作

2、启动driver程序

3、注册ApplicationMaster

4、分配Executors

5、等待程序运行结束

我们重点看分配Executor方法。
  1.   private def allocateExecutors() {
  2.     try {
  3.       logInfo("Allocating " + args.numExecutors + " executors.")
  4.       // 分host、rack、任意机器三种类型向ResourceManager提交ContainerRequest
  5.     // 请求的Container数量可能大于需要的数量
  6.       yarnAllocator.addResourceRequests(args.numExecutors)
  7.       // Exits the loop if the user thread exits.
  8.       while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
  9.         if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
  10.           finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached")
  11.         }
  12.      // 把请求回来的资源进行分配,并释放掉多余的资源
  13.         yarnAllocator.allocateResources()
  14.         ApplicationMaster.incrementAllocatorLoop(1)
  15.         Thread.sleep(100)
  16.       }
  17.     } finally {
  18.       // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
  19.       // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
  20.       ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
  21.     }
  22.     logInfo("All executors have launched.")
  23.     // 启动一个线程来状态报告
  24.     if (userThread.isAlive) {
  25.       // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
  26.       val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
  27.       // we want to be reasonably responsive without causing too many requests to RM.
  28.       val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
  29.       // must be <= timeoutInterval / 2.
  30.       val interval = math.min(timeoutInterval / 2, schedulerInterval)
  31.       launchReporterThread(interval)
  32.     }
  33.   }
复制代码


这里面我们只需要看addResourceRequests和allocateResources方法即可。

先说addResourceRequests方法,代码就不贴了。

Client向ResourceManager提交Container的请求,分三种类型:优先选择机器、同一个rack的机器、任意机器。

优先选择机器是在RDD里面的getPreferredLocations获得的机器位置,如果没有优先选择机器,也就没有同一个rack之说了,可以是任意机器。

下面我们接着看allocateResources方法。
  1.   def allocateResources() {
  2.     // We have already set the container request. Poll the ResourceManager for a response.
  3.     // This doubles as a heartbeat if there are no pending container requests.
  4.   // 之前已经提交过Container请求了,现在只需要获取response即可
  5.     val progressIndicator = 0.1f
  6.     val allocateResponse = amClient.allocate(progressIndicator)
  7.     val allocatedContainers = allocateResponse.getAllocatedContainers()
  8.     if (allocatedContainers.size > 0) {
  9.       var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
  10.       if (numPendingAllocateNow < 0) {
  11.         numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
  12.       }
  13.       val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
  14.       for (container <- allocatedContainers) {
  15.      // 内存 > Executor所需内存 + 384
  16.         if (isResourceConstraintSatisfied(container)) {
  17.           // 把container收入名册当中,等待发落
  18.           val host = container.getNodeId.getHost
  19.           val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
  20.           containersForHost += container
  21.         } else {
  22.           // 内存不够,释放掉它
  23.           releaseContainer(container)
  24.         }
  25.       }
  26.       // 找到合适的container来使用.
  27.       val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
  28.       val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
  29.       val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
  30.     // 遍历所有的host
  31.       for (candidateHost <- hostToContainers.keySet) {
  32.         val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
  33.         val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
  34.         val remainingContainersOpt = hostToContainers.get(candidateHost)
  35.         var remainingContainers = remainingContainersOpt.get
  36.       
  37.         if (requiredHostCount >= remainingContainers.size) {
  38.           // 需要的比现有的多,把符合数据本地性的添加到dataLocalContainers映射关系里
  39.           dataLocalContainers.put(candidateHost, remainingContainers)
  40.           // 没有containner剩下的.
  41.           remainingContainers = null
  42.         } else if (requiredHostCount > 0) {
  43.           // 获得的container比所需要的多,把多余的释放掉
  44.           val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
  45.           dataLocalContainers.put(candidateHost, dataLocal)
  46.           for (container <- remaining) releaseContainer(container)
  47.           remainingContainers = null
  48.         }
  49.         // 数据所在机器已经分配满任务了,只能在同一个rack里面挑选了
  50.         if (remainingContainers != null) {
  51.           val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
  52.           if (rack != null) {
  53.             val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
  54.             val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
  55.               rackLocalContainers.getOrElse(rack, List()).size
  56.             if (requiredRackCount >= remainingContainers.size) {
  57.               // Add all remaining containers to to `dataLocalContainers`.
  58.               dataLocalContainers.put(rack, remainingContainers)
  59.               remainingContainers = null
  60.             } else if (requiredRackCount > 0) {
  61.               // Container list has more containers that we need for data locality.
  62.               val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
  63.               val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())
  64.               existingRackLocal ++= rackLocal
  65.               remainingContainers = remaining
  66.             }
  67.           }
  68.         }
  69.         if (remainingContainers != null) {
  70.           // 还是不够,只能放到别的rack的机器上运行了
  71.           offRackContainers.put(candidateHost, remainingContainers)
  72.         }
  73.       }
  74.       // 按照数据所在机器、同一个rack、任意机器来排序
  75.       val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
  76.       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
  77.       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
  78.       allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
  79.       // 遍历选择了的Container,为每个Container启动一个ExecutorRunnable线程专门负责给它发送命令
  80.       for (container <- allocatedContainersToProcess) {
  81.         val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
  82.         val executorHostname = container.getNodeId.getHost
  83.         val containerId = container.getId
  84.      // 内存需要大于Executor的内存 + 384
  85.         val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
  86.         if (numExecutorsRunningNow > maxExecutors) {
  87.           // 正在运行的比需要的多了,释放掉多余的Container
  88.           releaseContainer(container)
  89.           numExecutorsRunning.decrementAndGet()
  90.         } else {
  91.           val executorId = executorIdCounter.incrementAndGet().toString
  92.           val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
  93.             sparkConf.get("spark.driver.host"),
  94.             sparkConf.get("spark.driver.port"),
  95.             CoarseGrainedSchedulerBackend.ACTOR_NAME)
  96.           // To be safe, remove the container from `pendingReleaseContainers`.
  97.           pendingReleaseContainers.remove(containerId)
  98.          // 把container记录到已分配的rack的映射关系当中
  99.           val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
  100.           allocatedHostToContainersMap.synchronized {
  101.             val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
  102.               new HashSet[ContainerId]())
  103.             containerSet += containerId
  104.             allocatedContainerToHostMap.put(containerId, executorHostname)
  105.             if (rack != null) {
  106.               allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
  107.             }
  108.           }
  109.       // 启动一个线程给它进行跟踪服务,给它发送运行Executor的命令
  110.           val executorRunnable = new ExecutorRunnable(
  111.             container,
  112.             conf,
  113.             sparkConf,
  114.             driverUrl,
  115.             executorId,
  116.             executorHostname,
  117.             executorMemory,
  118.             executorCores)
  119.           new Thread(executorRunnable).start()
  120.         }
  121.       }
  122.       
  123.   }
复制代码


1、把从ResourceManager中获得的Container进行选择,选择顺序是按照前面的介绍的三种类别依次进行,优先选择机器 > 同一个rack的机器 > 任意机器。

2、选择了Container之后,给每一个Container都启动一个ExecutorRunner一对一贴身服务,给它发送运行CoarseGrainedExecutorBackend的命令。

3、ExecutorRunner通过NMClient来向NodeManager发送请求。



总结:
把作业发布到yarn上面去执行这块涉及到的类不多,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner这四个类。

1、Client作为Yarn的客户端,负责向Yarn发送启动ApplicationMaster的命令。

2、ApplicationMaster就像项目经理一样负责整个项目所需要的工作,包括请求资源,分配资源,启动Driver和Executor,Executor启动失败的错误处理。

3、ApplicationMaster的请求、分配资源是通过YarnAllocationHandler来进行的。

4、Container选择的顺序是:优先选择机器 > 同一个rack的机器 > 任意机器。

5、ExecutorRunner只负责向Container发送启动CoarseGrainedExecutorBackend的命令。

6、Executor的错误处理是在ApplicationMaster的launchReporterThread方法里面,它启动的线程除了报告运行状态,还会监控Executor的运行,一旦发现有丢失的Executor就重新请求。

7、在yarn目录下看到的名称里面带有YarnClient的是属于yarn-client模式的类,实现和前面的也差不多。

其它的内容更多是Yarn的客户端api使用,我也不太会,只是看到了能懂个意思,哈哈。

相关内容推荐:
Spark源码系列(一)spark-submit提交作业过程
Spark源码系列(二)RDD详解
Spark源码系列(三)作业运行过程
Spark源码系列(四)图解作业生命周期
Spark源码系列(五)分布式缓存
Spark源码系列(六)Shuffle的过程解析
Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(八)Spark Streaming实例分析









本文转载自:http://www.cnblogs.com/cenyuhai/p/3834894.html





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条