分享

如何使用spark将程序提交任务到yarn

本帖最后由 xioaxu790 于 2014-5-19 13:14 编辑
问题导读:
1、怎样使用Spark提交任务到yarn 中 ?
2、提交到yarn 中,有哪些方式 ?
3.如何通过程序提交到yarn





因为spark文档中只介绍了两种用脚本提交到yarn的例子,并没有介绍如何通过程序提交yarn,但是我们的需求需要这样。网上很难找到例子,经过几天摸索,终于用程序提交到yarn成功,下面总结一下。

先介绍官网提交的例子,我用的是spark 0.9.0 hadoop2.2.0

一.使用脚本提交
1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。
2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.
3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR

1.yarn-standalone方式提交到yarn
在${SPARK_HOME}下面执行:

  1. SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
  2.     ./bin/spark-class org.apache.spark.deploy.yarn.Client \
  3.       --jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
  4.       --class org.apache.spark.examples.SparkPi \
  5.       --args yarn-standalone \
  6.       --num-workers 3 \
  7.       --master-memory 2g \
  8.       --worker-memory 2g \
  9.       --worker-cores 1
复制代码

2. yarn-client 方式提交到yarn
在${SPARK_HOME}下面执行:

  1. SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \
  2. SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \
  3. ./bin/run-example org.apache.spark.examples.SparkPi yarn-client
复制代码


二、使用程序提交
1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报
  1. org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制
复制代码

错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。

2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。

3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR
比如我的设置为向提交任务主机~/.bashrc里面添加:
  1. export SPARK_JAR=file:///home/ndyc/software/sparkTest/lib/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
  2.      export SPARK_YARN_APP_JAR=file:///home/ndyc/software/sparkTest/ndspark-0.0.1.jar
复制代码

file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。

其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar
SPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。

4.程序中加入hadoop、yarn、依赖。
注意,如果引入了hbase依赖,需要这样配置

  1. <dependency>
  2.             <groupId>org.apache.hbase</groupId>
  3.             <artifactId>hbase-thrift</artifactId>
  4.             <version>${hbase.version}</version>
  5.             <exclusions>
  6.                 <exclusion>
  7.                     <groupId>org.apache.hadoop</groupId>
  8.                     <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  9.                 </exclusion>
  10.                 <exclusion>
  11.                     <groupId>org.apache.hadoop</groupId>
  12.                     <artifactId>hadoop-client</artifactId>
  13.                 </exclusion>
  14.             </exclusions>
  15.         </dependency>
复制代码

然后再加入

  1. <dependency>
  2.             <groupId>org.ow2.asm</groupId>
  3.             <artifactId>asm-all</artifactId>
  4.             <version>4.0</version>
  5.         </dependency>
复制代码

否则会报错:

  1. IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class
复制代码

异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar

5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。

6.编写程序
代码示例:

  1. package com.sdyc.ndspark.sys;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import scala.Tuple2;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. /**
  12. * Created with IntelliJ IDEA.
  13. * User: zarchary
  14. * Date: 14-1-19
  15. * Time: 下午6:23
  16. * To change this template use File | Settings | File Templates.
  17. */
  18. public class ListTest {
  19.     public static void main(String[] args) throws Exception {
  20.         SparkConf sparkConf = new SparkConf();
  21.         sparkConf.setAppName("listTest");
  22.         //使用yarn模式提交
  23.         sparkConf.setMaster("yarn-client");
  24. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  25.         List<String> listA = new ArrayList<String>();
  26.         listA.add("a");
  27.         listA.add("a");
  28.         listA.add("b");
  29.         listA.add("b");
  30.         listA.add("b");
  31.         listA.add("c");
  32.         listA.add("d");
  33.         JavaRDD<String> letterA = sc.parallelize(listA);
  34.         JavaPairRDD<String, Integer> letterB = letterA.map(new PairFunction<String, String, Integer>() {
  35.             @Override
  36.             public Tuple2<String, Integer> call(String s) throws Exception {
  37.                 return new Tuple2<String, Integer>(s, 1);
  38.             }
  39.         });
  40.         letterB = letterB.reduceByKey(new Function2<Integer, Integer, Integer>() {
  41.             public Integer call(Integer i1, Integer i2) {
  42.                 return i1 + i2;
  43.             }
  44.         });
  45.         //颠倒顺序
  46.         JavaPairRDD<Integer, String> letterC = letterB.map(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  47.             @Override
  48.             public Tuple2<Integer, String> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
  49.                 return new Tuple2<Integer, String>(stringIntegerTuple2._2, stringIntegerTuple2._1);
  50.             }
  51.         });
  52.         JavaPairRDD<Integer, List<String>> letterD = letterC.groupByKey();
  53. //        //false说明是降序
  54.         JavaPairRDD<Integer, List<String>> letterE = letterD.sortByKey(false);
  55.         System.out.println("========" + letterE.collect());
  56.         System.exit(0);
  57.     }
  58. }
复制代码

代码中master设置为yar-client表明了是使用提交到yarn.

关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。
以上弄完之后就可以运行程序了。
运行后会看到yarn的ui界面出现:
191243_TTEl_132722.jpg


正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:

  1. 13247 org.apache.spark.deploy.yarn.WorkerLauncher
复制代码


这是spark的工作进程。
如果接收到异常为:

  1. WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
复制代码

出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报

WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。
所以请检查主机名和IP是否配置正确。

我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。





没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条