分享

探索Spark源码---spark集群中多个Driver提交集群调度(Standalone client模式提交)

regan 发表于 2015-12-9 16:25:02 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 1 19008
本帖最后由 regan 于 2015-12-17 16:41 编辑

b.jpg




在Spark集群搭建好之后,可以提交Application到集群中运行。如果有多个Application提交到集群中,那么集群如何调度运行不同的applicatino呢?我们还是站在源代码的角度,向大家揭开spark多application调度的面纱。
第一步:使用spark-submit提交应用
我们先来看看spark-submit:
    21    # Only define a usage function if an upstream script hasn't done so.
    22    if ! type -t usage >/dev/null 2>&1; then
    23      usage() {
    24        if [ -n "$1" ]; then
    25          echo "$1"
    26        fi
    27        "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit --help
    28        exit "$2"
    29      }
    30      export -f usage
    31    fi
    32    exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
在spark-submit脚本中,我们砍掉调用了SparkSubmit,我们以此为线索,进入到SparkSubmit。在SparkSubmit中,main函数就是提交应用的主入口:
def main(args: Array[String]): Unit = {
  val appArgs = new SparkSubmitArguments(args)
  if (appArgs.verbose) {
    printStream.println(appArgs)
  }
  appArgs.action match {
    case SparkSubmitAction.SUBMIT => submit(appArgs)
    case SparkSubmitAction.KILL => kill(appArgs)
    case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
  }
}


我们看到上面的在匹配appArgs.action,由于是提交,以此程序进入case SparkSubmitAction.SUBMIT=>submit(appArgs)分支,进入submit(appArgs)方法
可以看到:
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
这句代码的主要作用正如方法名称所示,准备提交的环境,该方法返回一个四维tuple,在该方法中进行了大量的运行模式,配置的解析与判断工作。我们到该方法中看看:
if (args.isStandaloneCluster) {
  if (args.useRest) {
    childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
    childArgs += (args.primaryResource, args.mainClass)
  } else {
    // In legacy standalone cluster mode, use Client as a wrapper around the user class
    childMainClass = "org.apache.spark.deploy.Client"
    if (args.supervise) { childArgs += "--supervise" }
    Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
    Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
    childArgs += "launch"
    childArgs += (args.master, args.primaryResource, args.mainClass)
  }
  if (args.childArgs != null) {
    childArgs ++= args.childArgs
}
}
在上面的代码中,我们看到在spark提交中根据提交的模式设置了childMainClass,在此我们使用的是client模式提交,因此childMainClass设置成org.apache.spark.deploy.Client
在runMain方法中通过反射机制,执行方法:
mainClass = Class.forName(childMainClass, true, loader)
valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
mainMethod.invoke(null, childArgs.toArray)
在此执行了org.apache.spark.deploy.Client中的main方法,通过spark-sbumit设置的参数也通过childArgs.toArray传递过去.
我们进入到Client对象的main方法中看看:
def main(args: Array[String]) {
  if (!sys.props.contains("SPARK_SUBMIT")) {
    println("WARNING: This client is deprecated and will be removed in a future version of Spark")
    println("Use ./bin/spark-submit with \"--master spark://host:port\"")
  }

  val conf = new SparkConf()
  val driverArgs = new ClientArguments(args)

  if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
    conf.set("spark.akka.logLifecycleEvents", "true")
  }
  conf.set("spark.rpc.askTimeout", "10")
  conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
  Logger.getRootLogger.setLevel(driverArgs.logLevel)

  val (actorSystem, _) = AkkaUtils.createActorSystem(
    "driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

  // Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
  for (m <- driverArgs.masters) {
    Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
  }
  actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

  actorSystem.awaitTermination()
}
在这里我们惊奇的发现,竟然创建的是一个actor,通过Akka系统创建了一个ClientAtor对象,在ClientActor中,向每一个masterActor发起注册请求:
// This assumes only one Master is active at a time
for (masterActor <- masterActors) {
  masterActor ! RequestSubmitDriver(driverDescription)
}
在这里,通过spark-submit提交的运行参数通过包装成driverDescription对象传递给master,driverDescription代码如下:
val driverDescription = new DriverDescription(
  driverArgs.jarUrl,
  driverArgs.memory,
  driverArgs.cores,
  driverArgs.supervise,
  command)
在masterActor中可以通过receive方法收到来自Client的注册driver的请求,我们可以到Master中查看相应的代码:
case RequestSubmitDriver(description) => {
  if (state != RecoveryState.ALIVE) {
    val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
      "Can only accept driver submissions in ALIVE state."
    sender ! SubmitDriverResponse(false, None, msg)
  } else {
    logInfo("Driver submitted " + description.command.mainClass)
    val driver = createDriver(description)
    persistenceEngine.addDriver(driver)
    waitingDrivers += driver
    drivers.add(driver)
    schedule()

    // TODO: It might be good to instead have the submission client poll the master to determine
    //       the current status of the driver. For now it's simply "fire and forget".

    sender ! SubmitDriverResponse(true, Some(driver.id),
      s"Driver successfully submitted as ${driver.id}")
  }
}
收到注册的driver之后,将会把driver添加到drivers这个hashSet集合中。
然后会执行schedule()调度方法,所有的diver注册都会调用这个schedule方法,它究竟奢什么用处,我们进去看看:
private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        waitingDrivers -= driver
      }
    }
  }
  startExecutorsOnWorkers()
}
在schedular方法中,我们看到先会将workers打乱混洗,返回一个随机的worker序列,然后遍历waitingDrivers数组,当worker节点上的内存和核数满足driver运行需求时,将会调用launchDriver方法,加载driver同时从waitingDrivers中移除该driver,最后调用startExecutorsOnWorders方法,在Woker节点启动Executor
从上面的分析可以看到加载driver是一个简单的FIFO的调度模式.至此,Driver/Client端向Master完成注册,加载Driver(即launchDriver)的调度方式为简单的FIFO调度方式

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条