分享

SparkR架构详解第二章:R进程与JVM的通信

本帖最后由 regan 于 2016-10-12 16:18 编辑

1.driver端R进程如何与JVM通信的?
2.executor端Rworker进程如何与JVM通信的?
k_r_dataflow.png
在Spark2.X中SparkContext已经被SparkSession所代替。在sparkR.R文件中可以看到有sparkR.session方法的定义,这个方法主要是初始化并放回SparkSession对象,这个对象中握有SparkContext的引用,因此可以这样理解,在SparkR项目中,用R代码重写了Spark中的Scala代码。不过别忘了,这可是在R代码中!调度框架仍然是基于JVM的,R不可能重复制造轮子,既然有了现成的调度框架,那直接用就好了。这就涉及到了R进程与JVM进程的通信了。Driver端的R进程如何与JVM进程通信呢?先来看一看R写的SparkR的小程序,如下:
#R中引入包,类似于java、scala中的importlibrary(SparkR)
#R是脚本语言,如果你写过JS代码对R语法就应该很了解了,直接用SparkR包中的spark.init方法创建SparkSession
sc <- sparkR.session(master="local[1]",sparkEnvir=list(spark.executor.memory="1g",spark.cores.max="10"))
#调用SQLContext.R中的read.text方法,读取本地的文本
lines <- read.text("file:///home/spark/test")
#直接打印
head(lines)
保存文件为test.R 使用./spark-submit test.R命令提交脚本执行。
结果如下所示:
16/10/11 17:22:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/11 17:22:30 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
            value
1      1 2 3 4 5
2      6 7 8 9 10
3  11 12 13 14 15
4  16 17 18 19 20
5  21 22 23 24 25
6 26 27 28 29 30

可以看到调用head函数,直接打印出了文本中的前6行数据!看到这里是不是觉得使用R脚本编写代码比java/scala更简洁!SparkR的确是统计学家的福音啊!!扯远了!赶紧回到正题,在Driver端R进程如何与JVM进程通信的。使用spark-submit提交R脚本,会调用SparkSubmit类进行处理,当判断提交的为R脚本时,是初始化RRunner线程并在线程中启动R进程。R进程将运行上面的test.R脚本。看一下sparkR.session方法的定义:
sparkR.session <- function(  master = "",#设置master,同spark一样
  appName = "SparkR",#设置appName,默认为SparkR
  sparkHome = Sys.getenv("SPARK_HOME"),#得到SPARK_HOME环境变量
  sparkConfig = list(),#sparkConfig为一个列表,默认为空列表
  sparkJars = "",#依赖的sparkJar
  sparkPackages = "",#依赖的包
  enableHiveSupport = TRUE,#是否启用hive支持,...表示还有任意多个变量
  ...) {

  sparkConfigMap <- convertNamedListToEnv(sparkConfig)#转换配置为map对象,保存于sparkConfigMap
  namedParams <- list(...)
  if (length(namedParams) > 0) {
    paramMap <- convertNamedListToEnv(namedParams)
    # Override for certain named parameters
    if (exists("spark.master", envir = paramMap)) {
      master <- paramMap[["spark.master"]]
    }
    if (exists("spark.app.name", envir = paramMap)) {
      appName <- paramMap[["spark.app.name"]]
    }
    overrideEnvs(sparkConfigMap, paramMap)
  }
  # do not download if it is run in the sparkR shell
  if (!nzchar(master) || is_master_local(master)) {
    if (!is_sparkR_shell()) {
      if (is.na(file.info(sparkHome)$isdir)) {
        msg <- paste0("Spark not found in SPARK_HOME: ",
                      sparkHome,
                      " .\nTo search in the cache directory. ",
                      "Installation will start if not found.")
        message(msg)
        packageLocalDir <- install.spark()
        sparkHome <- packageLocalDir
      } else {
        msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome)
        message(msg)
      }
    }
  }

  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
    sparkExecutorEnvMap <- new.env()
#注意!!!!在创建SparkSession之前,必须先新建SparkContext,sparkR.sparkContext负责创建JavaSparkContext。并在该方法中与JVM建立连接
    sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
       sparkJars, sparkPackages)
    stopifnot(exists(".sparkRjsc", envir = .sparkREnv))
  }
#如果.sparkREnv环境变量中存在.sparkRsession
  if (exists(".sparkRsession", envir = .sparkREnv)) {
    sparkSession <- get(".sparkRsession", envir = .sparkREnv)
    # Apply config to Spark Context and Spark Session if already there
    # Cannot change enableHiveSupport
    callJStatic("org.apache.spark.sql.api.r.SQLUtils",
                "setSparkContextSessionConf",
                sparkSession,
                sparkConfigMap)
  } else {#不存在则调用callJStatic方法,调用SQLUtils类中的getOrCreateSparkSession方法创建SparkSession
    jsc <- get(".sparkRjsc", envir = .sparkREnv)
    sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
                                "getOrCreateSparkSession",
                                jsc,
                                sparkConfigMap,
                                enableHiveSupport)
    assign(".sparkRsession", sparkSession, envir = .sparkREnv)
  }
  sparkSession
}
上面的代码中,我注解了R与JVM的连接,在sparkR.sparkContext方法中,有如下的代码(代码有点长,可是别紧张!!):
sparkR.sparkContext <- function(  master = "",
  appName = "SparkR",
  sparkHome = Sys.getenv("SPARK_HOME"),
  sparkEnvirMap = new.env(),
  sparkExecutorEnvMap = new.env(),
  sparkJars = "",
  sparkPackages = "") {
   #判断.sparkREnv环境变量中是否已经存在.sparkRjsc,如果已经存在了,则使用get方法从.sparkREnv中取出.sparkRjsc对象并返回
  if (exists(".sparkRjsc", envir = .sparkREnv)) {
    cat(paste("Re-using existing Spark Context.",
              "Call sparkR.session.stop() or restart R to create a new Spark Context\n"))
    return(get(".sparkRjsc", envir = .sparkREnv))
  }
#处理spark的jar包
  jars <- processSparkJars(sparkJars)
  packages <- processSparkPackages(sparkPackages)#处理依赖包

#从环境变量中取出EXISTING_SPARK_BACKEND_PORT的值,如果没有默认返回空串
  existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
#判断existingPort是否为空串,使用spark-submit提交的R脚本,会调用RRunner,在RRunner中会设置为非空值
  if (existingPort != "") {
    if (length(packages) != 0) {
      warning(paste("sparkPackages has no effect when using spark-submit or sparkR shell",
                    " please use the --packages commandline instead", sep = ","))
    }
    #将存在的端口号赋值给backendPort
    backendPort <- existingPort
  } else {#sparkR命令行交互中运行
    #以backend_port创建临时文件
    path <- tempfile(pattern = "backend_port")
    #如果是用sparkR脚本运行,则会提交SPARK_SUBMIT_ARGS参数,该参数的值为sparkr-shell
    submitOps <- getClientModeSparkSubmitOpts(
        Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
        sparkEnvirMap)
   #调用spark-submit脚本,初始化JVM。其实从这里可以看出,在repl命令行中运行R命令,最终也是会转换为运行spark-submit脚本的,而spark-submit脚本的运行将会初始化JVM
    launchBackend(
        args = path,
        sparkHome = sparkHome,
        jars = jars,
        sparkSubmitOpts = submitOps,
        packages = packages)
    # wait atmost 100 seconds for JVM to launch
    wait <- 0.1
    for (i in 1:25) {
      Sys.sleep(wait)
      if (file.exists(path)) {
        break
      }
      wait <- wait * 1.25
    }
    if (!file.exists(path)) {
      stop("JVM is not ready after 10 seconds")
    }
    f <- file(path, open = "rb")
    backendPort <- readInt(f)
    monitorPort <- readInt(f)
    rLibPath <- readString(f)
    close(f)
    file.remove(path)
    if (length(backendPort) == 0 || backendPort == 0 ||
        length(monitorPort) == 0 || monitorPort == 0 ||
        length(rLibPath) != 1) {
      stop("JVM failed to launch")
    }
   #注意!!!,R与JVM的连接就是在这里建立的啦!!!R中的socketConnection将会与服务端建立连接,而这里的连接是RBackend,是由RRunner启动的!这里的.monitorcOnn为监控连接
    assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
    assign(".backendLaunched", 1, envir = .sparkREnv)
    if (rLibPath != "") {
      assign(".libPath", rLibPath, envir = .sparkREnv)
      .libPaths(c(rLibPath, .libPaths()))
    }
  }
  #在次调用connectBackend方法,与本地启动的JVM建立连接!
  .sparkREnv$backendPort <- backendPort
  tryCatch({
    connectBackend("localhost", backendPort)
  },
  error = function(err) {
    stop("Failed to connect JVM\n")
  })

  if (nchar(sparkHome) != 0) {
    sparkHome <- suppressWarnings(normalizePath(sparkHome))
  }

  if (is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
    sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
      paste0("$LD_LIBRARY_PATH:", Sys.getenv("LD_LIBRARY_PATH"))
  }

  # Classpath separator is ";" on Windows
  # URI needs four /// as from http://stackoverflow.com/a/18522792
  if (.Platform$OS.type == "unix") {
    uriSep <- "//"
  } else {
    uriSep <- "////"
  }
  localJarPaths <- lapply(jars,
                          function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })

  # Set the start time to identify jobjs
  # Seconds resolution is good enough for this purpose, so use ints
  assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
  #R与JVM建立连接后,调用callJStatic调用org.apache.spark.api.r.RRDD中的createSparkContext方法创建JavaSparkContext,并将该对象赋值给.sparkRjsc变量。
  assign(
    ".sparkRjsc",
    callJStatic(
      "org.apache.spark.api.r.RRDD",
      "createSparkContext",
      master,
      appName,
      as.character(sparkHome),
      localJarPaths,
      sparkEnvirMap,
      sparkExecutorEnvMap),
    envir = .sparkREnv
  )

  sc <- get(".sparkRjsc", envir = .sparkREnv)

  # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
  reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
  #返回JavaSparkContext的R对象
  sc
}

总结:R语言的强大之处在于其丰富的统计功能,以及大量的第三方包。R本身不能进行分布式计算,而且R语言的底层构架也是只支持单线程的。sparkR的出现,将Spark优秀的分布式扩展能力,以及R的统计功能完美的结合,虽然在R于JVM通信会有性能的开销,但是SparkR的确为统计学家及R社区注入了活力。现在SparkR可以分布与几千台计算机上进行分布式并行的计算,可以处理PB级的数据量,而且SparkR中的机器学习、数据挖掘库在减少建模难度的同时,也为企业在最短的时间带来最大的效益!



所以,没事搞搞R,搞搞Spark还是不错的!

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条