分享

spark源码分析--rdd和stage源码分析

pig2 2014-4-2 01:40:46 发表于 代码分析 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 8 17930
1.Stage分为哪两种?
2.Stage是什么?
3.什么操作可以产生rdd?

已有(8)人评论

跳转到指定楼层
pig2 发表于 2014-4-2 01:48:42
本帖最后由 pig2 于 2014-4-2 01:53 编辑
Spark 源码分析 -- Stage



理解stage, 关键就是理解Narrow Dependency和Wide Dependency, 可能还是觉得比较难理解
关键在于是否需要shuffle, 不需要shuffle是可以随意并发的, 所以stage的边界就是需要shuffle的地方, 如下图很清楚

image

并且Stage分为两种,

shuffle map stage, in which case its tasks' results are input for another stage
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入
result stage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
最终的stage, 没有输出, 而是直接产生结果或存储



1 stage class

这个注释写的很清楚
可以看到stage的RDD参数只有一个RDD, final RDD, 而不是一系列的RDD
因为在一个stage中的所有RDD都是map, partition不会有任何改变, 只是在data依次执行不同的map function
所以对于task scheduler而言, 一个RDD的状况就可以代表这个stage
  1. /**
  2. * A stage is a set of independent tasks all computing the same function that need to run as part
  3. * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
  4. * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
  5. * DAGScheduler runs these stages in topological order.
  6. *
  7. * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
  8. * another stage, or a result stage, in which case its tasks directly compute the action that
  9. * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
  10. * that each output partition is on.
  11. *
  12. * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
  13. * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
  14. * faster on failure.
  15. */
  16. private[spark] class Stage(
  17.     val id: Int,
  18.     val rdd: RDD[_], // final RDD
  19.     val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
  20.     val parents: List[Stage], // 父stage
  21.     val jobId: Int,
  22.     callSite: Option[String])
  23.   extends Logging {
  24.   val isShuffleMap = shuffleDep != None  // 是否是shuffle map stage, 取决于是否有shuffleDep
  25.   val numPartitions = rdd.partitions.size
  26.   val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) // 用于buffer每个shuffle中每个maptask的MapStatus
  27.   var numAvailableOutputs = 0
  28.   private var nextAttemptId = 0
  29.   def isAvailable: Boolean = {
  30.     if (!isShuffleMap) {
  31.       true
  32.     } else {
  33.       numAvailableOutputs == numPartitions
  34.     }
  35.   }
  36. }
复制代码





2 newStage

如果是shuffle map stage, 需要在这里向mapOutputTracker注册shuffle
  1.   /**
  2.    * Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
  3.    * as a result stage for the final RDD used directly in an action. The stage will also be
  4.    * associated with the provided jobId.
  5.    */
  6.   private def newStage(
  7.       rdd: RDD[_],
  8.       shuffleDep: Option[ShuffleDependency[_,_]],
  9.       jobId: Int,
  10.       callSite: Option[String] = None)
  11.     : Stage =
  12.   {
  13.     if (shuffleDep != None) {
  14.       // Kind of ugly: need to register RDDs with the cache and map output tracker here
  15.       // since we can't do it in the RDD constructor because # of partitions is unknown
  16.       logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
  17.       mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
  18.     }
  19.     val id = nextStageId.getAndIncrement()
  20.     val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
  21.     stageIdToStage(id) = stage
  22.     stageToInfos(stage) = StageInfo(stage)
  23.     stage
  24.   }
复制代码




3 getMissingParentStages

  1. 可以根据final stage的deps找出所有的parent stage
  2.   private def getMissingParentStages(stage: Stage): List[Stage] = {
  3.     val missing = new HashSet[Stage]
  4.     val visited = new HashSet[RDD[_]]
  5.     def visit(rdd: RDD[_]) {
  6.       if (!visited(rdd)) {
  7.         visited += rdd
  8.         if (getCacheLocs(rdd).contains(Nil)) {
  9.           for (dep <- rdd.dependencies) {
  10.             dep match {
  11.               case shufDep: ShuffleDependency[_,_] => // 如果发现ShuffleDependency, 说明遇到新的stage
  12.                 val mapStage = getShuffleMapStage(shufDep, stage.jobId) // check shuffleToMapStage, 如果该stage已经被创建则直接返回, 否则newStage
  13.                 if (!mapStage.isAvailable) {
  14.                   missing += mapStage
  15.                 }
  16.               case narrowDep: NarrowDependency[_] => // 对于NarrowDependency, 说明仍然在这个stage中
  17.                 visit(narrowDep.rdd)
  18.             }
  19.           }
  20.         }
  21.       }
  22.     }
  23.     visit(stage.rdd)
  24.     missing.toList
  25.   }
复制代码






----------------------------------------------------------------------------------------------------------------------------
rdd和stage的生成



从一个简单的例子,来看rdd的演化,和stage是如何生成的
  1. object BaiWordCount2 {
  2.   def main(args: Array[String]) {
  3.     .....
  4.     // Create the context
  5.     val ssc = new SparkContext(args(0), "BaiWordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
  6.     val lines = ssc.textFile(args(1))//基于hadoopRdd,创建了一个MapRdd[String]
  7.     val words = lines.flatMap(_.split(" "))
  8.     val wordCounts = words.map(x =>{ println("x:"+ x);(x, 1) } )//这回返回的是一个元组了
  9.     val red = wordCounts.reduceByKey( (a,b)=>{a + b} )
  10.     red.saveAsTextFile("/root/Desktop/out")
  11.   }
  12. }
复制代码

我们首先总结出每个操作生成的rdd,这是通过源码追踪得到的:

textFile:生成一个HadoopRDD 和 一个MapPedRDD[String]

flatMap:前面生成的MapPedRDD[String],调用flatMap函数,生成了FlatMaPPedRDD[String]

map:FlatMaPPedRDD[String],调用map函数,创建了一个创建了一个MapPartitionsRDD,再通过隐式转换,生成了PairRDD[String,Int](因为map操作,产生了一对值key和value) reduceByKey:生成三个Rdd,首先根据之前的PairRDD,生成一个MapPartitionsRDD(这个RDD起到类似map-reduce里面,combine()的作用),再生成一个shuffledRdd(这个rdd是分割stage的重要依据之一),这之后再生成一个MapPartitionsRDD[String](这个rdd起到hadoop里reducer的作用) saveAsTextFile先生成一个MapPedRDD,然后调用runJob函数,将之前生成的rdd链表,提交到spark集群上,spark根局rdd的类型,划分成一个或多个stage(只有shuffledRdd这类的rdd,才会成为stage和stage之间的边界),然后将各个stage,按照依赖的先后顺序,将stage先后提交集群进行计算

下边通过textFile来详细说明rdd链表的生成过程和主要数据结构,主要注意deps和几种dependency:

从rdd生成的方式来说可以分成四类:

通过外部数据生成rdd,通过transformations函数生成,缓存操作,actions操作
textFile函数无疑属于第一种,通过外部数据,生成rdd,输入的文件路径,可以是hdfs://开头的hdfs数据,也可以本地文件路径,例如"/root/Desktop/word.text"

textFile函数调用hadoopFile 函数,生成一个hadoopRdd[K,V],默认情况下,泛型参数K和V,对应HadoopRDD的构造函数里的keyClass和valueClass。
也就是一个Rdd[LongWritable,Text],通过外部数据生成rdd的第一个rdd的特点是,deps是一个空的list,原因是它是从外部文件生成的,没有父rdd。
生成了Rdd[LongWritable,Text]后,还要调用transformations函数map:map(pair => pair._2.toString),来生成一个MappedRDD
MappedRDD(this, sc.clean(f)),这里this,就是之前生成的HadoopRDD,MappedRDD的构造函数,会调用父类的构造函数RDD[U](prev),
这个this(也就是hadoopRdd),会被赋值给prev,然后调用RDD.scala中,下面的构造函数

  1.   def this(@transient oneParent: RDD[_]) =
  2.     this(oneParent.context , List(new OneToOneDependency(oneParent)))
复制代码

这个函数的作用,是把父RDD的SparkContext(oneParent.context),和一个列表List(new OneToOneDependency(oneParent))),传入了另一个RDD的构造函数,

  1. RDD[T: ClassManifest](
  2.     @transient private var sc: SparkContext,
  3.     @transient private var deps: Seq[Dependency[_]]
  4.   )
复制代码

这样我们可以看到,在所有有父子关系的RDD,共享的是同一个SparkContext。而子RDD的deps变量,也被赋值为一个List,里面包含一个OneToOneDependency实例,表明父RDD和子RDD之间的关系 /

其实大多数的父子关系,包含的都是OneToOneDependency.比较例外的几个,比如join,这个很明显,他的数据不是来自同一个父RDD。而shuffledRdd的Dependency是ShuffledDependency

父Rdd会在子rdd的构造函数中被传入,然后放入子rdd实例的deps里面,被记录下来。这样,当我们得到一个Rdd之后,就可以向后回溯它的祖先,再结合传入的函数变量f,完整的得到它的构造过程。

flatMap,map,reduceByKey,saveAsTextFile则按顺序创建各自的rdd,然后在deps中记录父rdd,同时根据rdd的类型,生成各自的不同类型的dependency。

在saveAsTextFile函数把整个计算任务提交到集群之前,所有的函数进行的操作,仅仅就是生成rdd链表而已。saveAsTextFile是action类型的操作,action的共同特点是,会调用RunJob一类的函数,调用Dagscheduler.runJob,将最后一个rdd(在我们这个例子里,就是saveAsTextFile生成的那个MappedRdd),提交到集群上。集群会以这个rdd为参数之一,生成一个stage,名叫finalStage(故名思意,这是最终的一个stage)。然后调用submitStage,将刚刚生成的finalStage提交到集群上。这个stage是否会被马上执行呢?不一定,因为程序会调用getMissingParentStages,进行寻找,是否有需要先进行提交的stage---这个过程可以这样类比,一个查询操作,在提交之后,要先检查是否有子查询,如果有,先执行子查询,然后在执行父查询,这里的原因很简单,父查询依赖于子查询的数据。同理,在stage执行的过程中,也要先查询,它是否需要其他stage的数据(其实之后一种数据,就是通过shuffle传过来的数据),如果有,那么这些stage,就是它的missingParentStage,它要等他的missingParentStage执行成功,然后通过shuffle机制把数据传给它,才能开始执行。这个过程的执行过程如下:从最后一个rdd起,查看它的dependency的类型,如果是
shuffledDependency,则创建一个ShuffleMapStage,否则,就向前遍历,依次递归,知道最前面的rdd为止

  1. private def getMissingParentStages(stage: Stage): List[Stage] = {
  2.     .....
  3.     def visit(rdd: RDD[_]) {
  4.       if (!visited(rdd)) {/
  5.         visited += rdd
  6.         if (getCacheLocs(rdd).contains(Nil)) {
  7.           for (dep <- rdd.dependencies) {
  8.             dep match {
  9.               case shufDep: ShuffleDependency[_,_] =>
  10.                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
  11.               if (!mapStage.isAvailable) {
  12.                   missing += mapStage
  13.                 }
  14.               case narrowDep: NarrowDependency[_] =>
  15.                 visit(narrowDep.rdd)
  16.             }
  17.           }
  18.         }
  19.       }
  20.     }
  21.     visit(stage.rdd)
  22.     missing.toList
  23.   }
复制代码

当getMissingParentStage(stage)的结果为空的时候,表明这个stage没有missingParentStage,或者它的missingParentStage已经都执行完了,则当前这个stage才能被成功的提交到集群去执行,否则,它就要等待,并重复调用getMissingParentStage,直到它的结果为空,才可以被提交。







出处  http://baishuo491.iteye.com/blog/2019510

回复

使用道具 举报

xiaokekehaha 发表于 2014-6-19 09:37:06
rtyhq35h3r5h
回复

使用道具 举报

ucloud 发表于 2014-6-19 20:08:04
太高级了, 还在hadoop试玩
回复

使用道具 举报

tt7734 发表于 2014-11-4 19:09:10
谢谢楼主的分享
回复

使用道具 举报

tang 发表于 2015-11-27 11:12:40
很精彩的分析
回复

使用道具 举报

Pengjx2015 发表于 2016-1-9 19:59:47
很精彩的分析
回复

使用道具 举报

Pengjx2015 发表于 2016-1-11 11:01:04
分析的很透彻
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条