分享

Spark on YARN两种运行模式介绍


问题导读

1.Spark在YARN中有几种模式?
2.Yarn Cluster模式,Driver程序在YARN中运行,应用的运行结果在什么地方可以查看?
3.由client向ResourceManager提交请求,并上传jar到HDFS上包含哪些步骤?
4.传递给app的参数应该通过什么来指定?
5.什么模式下最后将结果输出到terminal中?






Spark在YARN中有yarn-cluster和yarn-client两种运行模式:
I. Yarn Cluster
Spark Driver首先作为一个ApplicationMaster在YARN集群中启动,客户端提交给ResourceManager的每一个job都会在集群的worker节点上分配一个唯一的ApplicationMaster,由该ApplicationMaster管理全生命周期的应用。因为Driver程序在YARN中运行,所以事先不用启动Spark Master/Client,应用的运行结果不能在客户端显示(可以在history server中查看),所以最好将结果保存在HDFS而非stdout输出,客户端的终端显示的是作为YARN的job的简单运行状况。

sparn-yarn1.png

by @Sandy Ryza

spark-yarn2.png

by 明风@taobao
从terminal的output中看到任务初始化更详细的四个步骤:
  1. 14/09/28 11:24:52 INFO RMProxy: Connecting to ResourceManager at hdp01/172.19.1.231:8032
  2. 14/09/28 11:24:52 INFO Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 4
  3. 14/09/28 11:24:52 INFO Client: Queue info ... queueName: root.default, queueCurrentCapacity: 0.0, queueMaxCapacity: -1.0,
  4. queueApplicationCount = 0, queueChildQueueCount = 0
  5. 14/09/28 11:24:52 INFO Client: Max mem capabililty of a single resource in this cluster 8192
  6. 14/09/28 11:24:53 INFO Client: Uploading file:/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar to hdfs://hdp01:8020/user/spark/.sparkStaging/application_1411874193696_0003/spark-examples_2.10-1.0.0-cdh5.1.0.jar
  7. 14/09/28 11:24:54 INFO Client: Uploading file:/usr/lib/spark/assembly/lib/spark-assembly-1.0.0-cdh5.1.0-hadoop2.3.0-cdh5.1.0.jar to hdfs://hdp01:8020/user/spark/.sparkStaging/application_1411874193696_0003/spark-assembly-1.0.0-cdh5.1.0-hadoop2.3.0-cdh5.1.0.jar
  8. 14/09/28 11:24:55 INFO Client: Setting up the launch environment
  9. 14/09/28 11:24:55 INFO Client: Setting up container launch context
  10. 14/09/28 11:24:55 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.master="spark://hdp01:7077", -Dspark.app.name="org.apache.spark.examples.SparkPi", -Dspark.eventLog.enabled="true", -Dspark.eventLog.dir="/user/spark/applicationHistory", -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ApplicationMaster, --class, org.apache.spark.examples.SparkPi, --jar , file:/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar, , --executor-memory, 1024, --executor-cores, 1, --num-executors , 2, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
  11. 14/09/28 11:24:55 INFO Client: Submitting application to ASM
  12. 14/09/28 11:24:55 INFO YarnClientImpl: Submitted application application_1411874193696_0003
  13. 14/09/28 11:24:56 INFO Client: Application report from ASM:
  14. application identifier: application_1411874193696_0003
  15. appId: 3
  16. clientToAMToken: null
  17. appDiagnostics:
  18. appMasterHost: N/A
  19. appQueue: root.spark
  20. appMasterRpcPort: -1
  21. appStartTime: 1411874695327
  22. yarnAppState: ACCEPTED
  23. distributedFinalState: UNDEFINED
  24. appTrackingUrl: http://hdp01:8088/proxy/application_1411874193696_0003/
  25. appUser: spark
复制代码

1. 由client向ResourceManager提交请求,并上传jar到HDFS上
这期间包括四个步骤:
a). 连接到RM
b). 从RM ASM(ApplicationsManager )中获得metric、queue和resource等信息。
c). upload app jar and spark-assembly jar
d). 设置运行环境和container上下文(launch-container.sh等脚本)

2.
ResouceManager向NodeManager申请资源,创建Spark ApplicationMaster(每个SparkContext都有一个ApplicationMaster)
3. NodeManager启动Spark App Master,并向ResourceManager AsM注册
4. Spark ApplicationMaster从HDFS中找到jar文件,启动DAGscheduler和YARN Cluster Scheduler
5. ResourceManager向ResourceManager AsM注册申请container资源(INFO YarnClientImpl: Submitted application)
6. ResourceManager通知NodeManager分配Container,这时可以收到来自ASM关于container的报告。(每个container的对应一个executor)
7. Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。
需要注意的是:
a). Spark中的localdir会被yarn.nodemanager.local-dirs替换
b). 允许失败的节点数(spark.yarn.max.worker.failures)为executor数量的两倍数量,最小为3.
c). SPARK_YARN_USER_ENV传递给spark进程的环境变量
d). 传递给app的参数应该通过–args指定。

部署:
环境介绍:

hdp0[1-4]四台主机
hadoop使用CDH 5.1版本: hadoop-2.3.0+cdh5.1.0+795-1.cdh5.1.0.p0.58.el6.x86_64
直接下载对应2.3.0的pre-build版本http://spark.apache.org/downloads.html
下载完毕后解压,检查spark-assembly目录:

  1. file /home/spark/spark-1.1.0-bin-hadoop2.3/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
  2. /home/spark/spark-1.1.0-bin-hadoop2.3/lib/spark-assembly-1.1.0-hadoop2.3.0.jar: Zip archive data, at least v2.0 to extract
复制代码


然后输出环境变量HADOOP_CONF_DIR/YARN_CONF_DIR和SPARK_JAR(可以设置到spark-env.sh中)
  1. export HADOOP_CONF_DIR=/etc/hadoop/etc
  2. export SPARK_JAR=/home/spark/spark-1.1.0-bin-hadoop2.3/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
复制代码


如果使用cloudera manager 5,在Spark Service的操作中可以找到Upload Spark Jar将spark-assembly上传到HDFS上。


spark-yarn3.png



Spark Jar Location (HDFS)
spark_jar_hdfs_path
/user/spark/share/lib/spark-assembly.jar
默认值
The location of the Spark jar in HDFS
Spark History Location (HDFS)
spark.eventLog.dir
/user/spark/applicationHistory
默认值
The location of Spark application history logs in HDFS. Changing this value will not move existing logs to the new location.
提交任务,此时在YARN的web UI和history Server上就可以看到运行状态信息。
  1. spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar
复制代码

II. yarn-client
(YarnClientClusterScheduler)查看对应类的文件
在yarn-client模式下,Driver运行在Client上,通过ApplicationMaster向RM获取资源。本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。结束掉终端,相当于kill掉这个spark应用。一般来说,如果运行的结果仅仅返回到terminal上时需要配置这个。

spark-yarn4.png

客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都 是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显 示,Driver以进程名为SparkSubmit的形式存在。
配置YARN-Client模式同样需要HADOOP_CONF_DIR/YARN_CONF_DIR和SPARK_JAR变量。
提交任务测试:
  1. spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode client /usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar
  2. terminal output:
  3. 14/09/28 11:18:34 INFO Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx512m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.tachyonStore.folderName="spark-9287f0f2-2e72-4617-a418-e0198626829b", -Dspark.eventLog.enabled="true", -Dspark.yarn.secondary.jars="", -Dspark.driver.host="hdp01", -Dspark.driver.appUIHistoryAddress="", -Dspark.app.name="Spark Pi", -Dspark.jars="file:/usr/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar", -Dspark.fileserver.uri="http://172.19.17.231:53558", -Dspark.eventLog.dir="/user/spark/applicationHistory", -Dspark.master="yarn-client", -Dspark.driver.port="35938", -Dspark.httpBroadcast.uri="http://172.19.17.231:43804", -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ExecutorLauncher, --class, notused, --jar , null, --args 'hdp01:35938' , --executor-memory, 1024, --executor-cores, 1, --num-executors , 2, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
  4. 14/09/28 11:18:34 INFO Client: Submitting application to ASM
  5. 14/09/28 11:18:34 INFO YarnClientSchedulerBackend: Application report from ASM:
  6. appMasterRpcPort: -1
  7. appStartTime: 1411874314198
  8. yarnAppState: ACCEPTED
  9. ......
复制代码
##最后将结果输出到terminal中


加微信w3aboutyun,可拉入技术爱好者群

已有(16)人评论

跳转到指定楼层
xw2016 发表于 2016-7-9 11:00:54
b). 允许失败的节点数(spark.yarn.max.worker.failures)为executor数量的两倍数量,最小为3.
请问楼主,这个怎么算出来的?一般允许失败节点数为总节点数的一半,这个如何理解呢?
回复

使用道具 举报

xw2016 发表于 2016-7-9 10:58:53
b). 允许失败的节点数(spark.yarn.max.worker.failures)为executor数量的两倍数量,最小为3.

请问楼主,这个怎么算出来的?
回复

使用道具 举报

atsky123 发表于 2016-2-24 17:03:49
mobilefactory8 发表于 2016-2-24 16:52
这个问题也困扰了我:(我添加了jars包,还是报错)

spark-1.5.0-bin-hadoop2.6]$ ./bin/spark-submit  ...

不会无缘无故报错的。
先看路径是否正确
在看看包是否正确
回复

使用道具 举报

mobilefactory8 发表于 2016-2-24 16:52:59
这个问题也困扰了我:(我添加了jars包,还是报错)

spark-1.5.0-bin-hadoop2.6]$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
>     --jars  /opt/BigData/inst/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0.jar \
>     --master yarn \
>     --deploy-mode client \
>     --driver-memory 1g \
>     --executor-memory 1g \
>     --executor-cores 1 \
>     --queue thequeue \
>     lib/spark-examples*.jar \
>     2

错误信息:        
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.conf.Configuration.addDeprecations([Lorg/apache/hadoop/conf/Configuration$DeprecationDelta;)V
        at org.apache.hadoop.yarn.conf.YarnConfiguration.addDeprecatedKeys(YarnConfiguration.java:79)
        at org.apache.hadoop.yarn.conf.YarnConfiguration.<clinit>(YarnConfiguration.java:73)
        at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:61)
        at org.apache.spark.deploy.SparkHadoopUtil.<init>(SparkHadoopUtil.scala:52)
        at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.<init>(YarnSparkHadoopUtil.scala:46)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:374)
        at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:389)
        at org.apache.spark.deploy.SparkHadoopUtil$.<init>(SparkHadoopUtil.scala:387)
        at org.apache.spark.deploy.SparkHadoopUtil$.<clinit>(SparkHadoopUtil.scala)
        at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2042)
        at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:97)
        at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:173)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:345)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:276)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:441)
        at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
        at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

能帮忙看看不?谢谢!
回复

使用道具 举报

chyeers 发表于 2016-1-25 16:21:41
那年夏天110 发表于 2016-1-14 17:44
他的问题 看我回复 应该是他想要的答案

嗯嗯,thanks
回复

使用道具 举报

那年夏天110 发表于 2016-1-14 17:44:26
xuanxufeng 发表于 2015-9-10 16:58
合理管理依赖包呢??为什么会有这个问题

他的问题 看我回复 应该是他想要的答案
回复

使用道具 举报

那年夏天110 发表于 2016-1-14 17:42:03
chyeers 发表于 2015-9-10 21:23
因为有的时候运行一个 --class 的时候 要 --jars xxxx,xxx,xxx,xxx   比较多的外部依赖包,有什么方法集 ...

可以处理的 要一个一个写依赖jar 太累人了。有两种办法来解决这个问题:
1. 用maven 收集所有的依赖jar 后 放到统一的文件夹下(如: /home/libs/ 下面一堆jar),提交spark job 的时候指定下这个文件加即可。
    例如:    ./spark-submit --jars $(echo /home/libs/*.jar |tr ' '  ',')  --master  。。。。。。
   这样其实是遍历了下指定的目录 循环加载,就不用自己手动一个个的加。 也方便所依赖jar 的动态增删维护

2.第二种是: 把你收集的所有依赖jar 和你的业务代码  写个脚本 全部打包整一个jar 然后 提交的时候只需指定一个 就可以了。
  但这种要是你修改了业务代码逻辑 就得又重新全部打包 比较麻烦。如果是测试环境测试通的,要丢到生产环境运行的话,这种方式也是挺不错,而且spark job 远程提交 貌似只能这样才能顺利跑通。

以上两种方式均在测试和生产环境 使用过,所有万无一失
      




回复

使用道具 举报

chyeers 发表于 2015-9-10 21:37:33
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条