分享

Spark通过Java Web提交任务


问题导读


1.本文用到哪些软件?
2.本文的场景是有哪些?
3.Spark通过Java Web提交任务如何实现的?






相关软件版本:

Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7

机器:

windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);

centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);

centos7虚拟机(Tomcat,JDK1.8);

1. 场景:
1. windows简单java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:

    1> 提交任务到Spark集群,使用standAlone模式执行;

    2> 提交任务到Yarn集群,使用yarn-client的模式;

2. windows 开发java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.

3. linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.



2. 实现:
1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:


[mw_shl_code=bash,true]import org.apache.spark.{SparkConf, SparkContext}


/**
* Created by Administrator on 2015/8/23.
*/
object Scala_Test {
  def main(args:Array[String]): Unit ={
    if(args.length!=2){
      System.err.println("Usage:Scala_Test <input> <output>")
    }
    // 初始化SparkConf
    val conf = new SparkConf().setAppName("Scala filter")
    val sc = new SparkContext(conf)

    //  读入数据
    val lines = sc.textFile(args(0))

    // 转换
    val errorsRDD = lines.filter(line => line.contains("ERROR"))
    val warningsRDD = lines.filter(line => line.contains("WARN"))
    val  badLinesRDD = errorsRDD.union(warningsRDD)

    // 写入数据
    badLinesRDD.saveAsTextFile(args(1))

    // 关闭SparkConf
    sc.stop()
  }
}
[/mw_shl_code]


使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);

2.  java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式

java代码如下:

[mw_shl_code=java,true]package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.spark.deploy.SparkSubmit;
/**
* @author fansy
*
*/
public class SubmitScalaJobToSpark {

        public static void main(String[] args) {
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
                String filename = dateFormat.format(new Date());
                String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
                tmp =tmp.substring(0, tmp.length()-8);
                String[] arg0=new String[]{
                                "--master","spark://node101:7077",
                                "--deploy-mode","client",
                                "--name","test java submit job to spark",
                                "--class","Scala_Test",
                                "--executor-memory","1G",
//                                "spark_filter.jar",
                                tmp+"lib/spark_filter.jar",//
                                "hdfs://node101:8020/user/root/log.txt",
                                "hdfs://node101:8020/user/root/badLines_spark_"+filename
                };
               
                SparkSubmit.main(arg0);
        }
}
[/mw_shl_code]

具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)
java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:


[mw_shl_code=java,true]package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

public class SubmitScalaJobToYarn {

        public static void main(String[] args) {
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
                String filename = dateFormat.format(new Date());
                String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
                tmp =tmp.substring(0, tmp.length()-8);
                String[] arg0=new String[]{
                                "--name","test java submit job to yarn",
                                "--class","Scala_Test",
                                "--executor-memory","1G",
//                                "WebRoot/WEB-INF/lib/spark_filter.jar",//
                                "--jar",tmp+"lib/spark_filter.jar",//
                               
                                "--arg","hdfs://node101:8020/user/root/log.txt",
                                "--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
                                "--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
                                "--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
                };
               
//                SparkSubmit.main(arg0);
                Configuration conf = new Configuration();
                String os = System.getProperty("os.name");
                boolean cross_platform =false;
                if(os.contains("Windows")){
                        cross_platform = true;
                }
                conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务
                conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
                conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
                conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
                conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定资源分配器
                conf.set("mapreduce.jobhistory.address","node101:10020");
               
                 System.setProperty("SPARK_YARN_MODE", "true");

                 SparkConf sparkConf = new SparkConf();
                 ClientArguments cArgs = new ClientArguments(arg0, sparkConf);
               
                new Client(cArgs,conf,sparkConf).run();
        }
}
[/mw_shl_code]

SparkServlet如下:
[mw_shl_code=java,true]package servlet;

import java.io.IOException;
import java.io.PrintWriter;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import test.SubmitScalaJobToSpark;

public class SparkServlet extends HttpServlet {

        /**
         * Constructor of the object.
         */
        public SparkServlet() {
                super();
        }

        /**
         * Destruction of the servlet. <br>
         */
        public void destroy() {
                super.destroy(); // Just puts "destroy" string in log
                // Put your code here
        }

        /**
         * The doGet method of the servlet. <br>
         *
         * This method is called when a form has its tag value method equals to get.
         *
         * @param request the request send by the client to the server
         * @param response the response send by the server to the client
         * @throws ServletException if an error occurred
         * @throws IOException if an error occurred
         */
        public void doGet(HttpServletRequest request, HttpServletResponse response)
                        throws ServletException, IOException {

                this.doPost(request, response);
        }

        /**
         * The doPost method of the servlet. <br>
         *
         * This method is called when a form has its tag value method equals to post.
         *
         * @param request the request send by the client to the server
         * @param response the response send by the server to the client
         * @throws ServletException if an error occurred
         * @throws IOException if an error occurred
         */
        public void doPost(HttpServletRequest request, HttpServletResponse response)
                        throws ServletException, IOException {
                System.out.println("开始SubmitScalaJobToSpark调用......");
                SubmitScalaJobToSpark.main(null);
                //YarnServlet也只是这里不同
                System.out.println("完成SubmitScalaJobToSpark调用!");
                response.setContentType("text/html");
                PrintWriter out = response.getWriter();
                out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
                out.println("<HTML>");
                out.println("  <HEAD><TITLE>A Servlet</TITLE></HEAD>");
                out.println("  <BODY>");
                out.print("    This is ");
                out.print(this.getClass());
                out.println(", using the POST method");
                out.println("  </BODY>");
                out.println("</HTML>");
                out.flush();
                out.close();
        }

        /**
         * Initialization of the servlet. <br>
         *
         * @throws ServletException if an error occurs
         */
        public void init() throws ServletException {
                // Put your code here
        }

}
[/mw_shl_code]

这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。
在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。



3. 总结及问题1. 测试结果:
   1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:


1.png
2.png

同时,在HDFS可以看到输出的文件:

3.png

2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。
    a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;
    b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;
2. 遇到的问题:
    1> java web 提交任务到Yarn,会失败,失败的主要日志如下:


[mw_shl_code=bash,true]15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse[/mw_shl_code]

这个是因为javax.servlet的包被删掉了,和tomcat的冲突。
同时,在日志中还可以看到:


[mw_shl_code=bash,true]15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.
[/mw_shl_code]

这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?
上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!
使用yarn.Client提交任务到Yarn参考http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/


分享,成长,快乐
脚踏实地,专注
转载请注明blog地址:http://blog.csdn.net/fansy1990





已有(4)人评论

跳转到指定楼层
轩辕依梦Q 发表于 2015-9-5 22:59:28
mark一下,多谢楼主分享,正好用上
回复

使用道具 举报

a_zhen 发表于 2015-9-6 11:48:57
要学的东西真是太多了
回复

使用道具 举报

whc9464 发表于 2019-2-12 15:16:26
看不太懂,怎么办
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条