分享

Apache Spark源码走读之2 -- Job的提交与运行

本帖最后由 pig2 于 2015-1-6 14:09 编辑


问题导读

1、spark实验环境搭建流程是什么?
2、spark中job的生成和运行流程具体步骤是?

3、Spark布置环境中组件构成分为哪几部分?



概要
本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建。

实验环境搭建
在进行后续操作前,确保下列条件已满足。
  • 下载spark binary 0.9.1
  • 安装scala
  • 安装sbt
  • 安装java


启动spark-shell
单机模式运行,即local模式
local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME
  1. MASTER=local bin/spark-shell
复制代码
"MASTER=local"就是表明当前运行在单机模式

local cluster方式运行
local cluster模式是一种伪cluster模式,在单机环境下模拟standalone的集群,启动顺序分别如下
  • 启动master
  • 启动worker
  • 启动spark-shell


master
  1. $SPARK_HOME/sbin/start-master.sh
复制代码
注意运行时的输出,日志默认保存在$SPARK_HOME/logs目录。
master主要是运行类 org.apache.spark.deploy.master.Master,在8080端口启动监听,日志如下图所示



修改配置
  • 进入$SPARK_HOME/conf目录
  • 将spark-env.sh.template重命名为spark-env.sh
  • 修改spark-env.sh,添加如下内容
export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost

运行worker
  1. bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M
复制代码


worker启动完成,连接到master。打开maser的web ui可以看到连接上来的worker. Master WEb UI的监听地址是http://localhost:8080
启动spark-shell
  1. MASTER=spark://localhost:7077 bin/spark-shell
复制代码

如果一切顺利,将看到下面的提示信息。

  1. Created spark context..Spark context available as sc.
复制代码

可以用浏览器打开localhost:4040来查看如下内容

  • stages
  • storage
  • environment
  • executors
wordcount

上述环境准备妥当之后,我们在sparkshell中运行一下最简单的例子,在spark-shell中输入如下代码

  1. scala>sc.textFile("README.md").filter(_.contains("Spark")).count
复制代码

上述代码统计在README.md中含有Spark的行数有多少


部署过程详解

Spark布置环境中组件构成如下图所示。



174137yjakqapqxpv6siip.png



  • Driver Program 简要来说在spark-shell中输入的wordcount语句对应于上图的Driver Program.
  • Cluster Manager 就是对应于上面提到的master,主要起到deploy management的作用
  • Worker Node 与Master相比,这是slave node。上面运行各个executor,executor可以对应于线程。executor处理两种基本的业务逻辑,一种就是driver programme,另一种就是job在提交之后拆分成各个stage,每个stage可以运行一到多个task
Notes: 在集群(cluster)方式下, Cluster Manager运行在一个jvm进程之中,而worker运行在另一个jvm进程中。在local cluster中,这些jvm进程都在同一台机器中,如果是真正的standalone或Mesos及Yarn集群,worker与master或分布于不同的主机之上。

JOB的生成和运行
job生成的简单流程如下
  • 首先应用程序创建SparkContext的实例,如实例为sc
  • 利用SparkContext的实例来创建生成RDD
  • 经过一连串的transformation操作,原始的RDD转换成为其它类型的RDD
  • 当action作用于转换之后RDD时,会调用SparkContext的runJob方法
  • sc.runJob的调用是后面一连串反应的起点,关键性的跃变就发生在此处


调用路径大致如下
  • sc.runJob->dagScheduler.runJob->submitJob
  • DAGScheduler::submitJob会创建JobSummitted的event发送给内嵌类eventProcessActor
  • eventProcessActor在接收到JobSubmmitted之后调用processEvent处理函数
  • job到stage的转换,生成finalStage并提交运行,关键是调用submitStage
  • 在submitStage中会计算stage之间的依赖关系,依赖关系分为宽依赖和窄依赖两种
  • 如果计算中发现当前的stage没有任何依赖或者所有的依赖都已经准备完毕,则提交task
  • 提交task是调用函数submitMissingTasks来完成
  • task真正运行在哪个worker上面是由TaskScheduler来管理,也就是上面的submitMissingTasks会调用TaskScheduler::submitTasks
  • TaskSchedulerImpl中会根据Spark的当前运行模式来创建相应的backend,如果是在单机运行则创建LocalBackend
  • LocalBackend收到TaskSchedulerImpl传递进来的ReceiveOffers事件
  • receiveOffers->executor.launchTask->TaskRunner.run


代码片段executor.lauchTask

  1. <font face="Tahoma"><font size="2"><font color="#000000">def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
  2.     val tr = new TaskRunner(context, taskId, serializedTask)
  3.     runningTasks.put(taskId, tr)
  4.     threadPool.execute(tr)
  5.   }</font></font></font>
复制代码



说了这么一大通,也就是讲最终的逻辑处理切切实实是发生在TaskRunner这么一个executor之内。
运算结果是包装成为MapStatus然后通过一系列的内部消息传递,反馈到DAGScheduler,这一个消息传递路径不是过于复杂,有兴趣可以自行勾勒。


相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条