分享

SparkSQL+SparkCore任务解析

问题导读



1.在我们调用spark API时,背后发生了什么呢?
2.RDD.id是在初始化时生成的,id代表什么?

3.rdd.collect()调用的效果是什么?





本文通过一个实例,介绍了混用SparkSQL与SparkCore时的任务解析过程。
实例伪码
hc.read.parquet(paths).filter(…).select(…).groupBy(all fileds).count().sort(…).toJavaRDD
.mapPartitionsToPair(…).coalesce().reduceByKey(…).collectAsMap()

SparkUI截图

在最初观察以下UI时,有几个疑问:
  • 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
  • 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
  • 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
直接解答以上问题,还比较容易。但真正理解spark对dataframe和core的任务解析过程,才能对该问题有一个完整的解答。

1.png


Job 6:

2.png


Job 7:
3.png


解析全览
以下列出了从编写代码到形成RDD的过程,并简单给出了Job形式的时间点。图较大,可以点击看原图。
  • 白色图标代表coding时的API。
  • 灰色代表code背后的逻辑概念,sparkSQL范畴里的DataFrame和LogicalPlan,以及SparkCore里的RDD,这些东西在编码时生成。
  • 蓝色是SparkSQL对logicalPlan进行analyze、optimize、plan后生成的物理执行计划。
  • 黄色是prepareForExecution阶段,在上一步物理计划基础上,又添加形成的最终物理执行计划。

4.png


在我们调用spark API时,背后发生了什么呢?
这个问题得分开看。
在SparkCore里,比较简单,可以理解为每个API都在之前RDD的基础上形成新的RDD,如全览图“主Job RDDs”一列下半段所示。
但SparkSQL里,就稍有不同,它的数据抽象是比RDD更高层次的DataFrame,即每个API都在之前DF的基础上生成新的DF。而DF的核心是LogicalPlan,它描述了plan的依赖关系、partition、Distribution等。如全览图“DataFrame”和“LogicalPlan”两列所示。
但不管RDD还是DataFrame,都是lazy的,只有在调用collect、save这样的方法时,才会真正触发执行。
toJavaRDD的效果
调用该方法时,会触发dataframe的解析(全览图标注为第1步):

lazy val rdd: RDD[Row] = {  // use a local variable to make sure the map closure doesn't capture the whole DataFrame  val schema = this.schema  queryExecution.executedPlan.execute().mapPartitions { rows =>    val converter = CatalystTypeConverters.createToScalaConverter(schema)    rows.map(converter(_).asInstanceOf[Row])  }}
上面的queryExecution.executedPlan会触发以下一系列动作( 注意,不包含execute()调用 ),完成语法解析、类型适配、优化等任务,最重要的是,会把逻辑计划真正翻译为物理执行计划!在planner.plan()完成后,会生成全览图里execution.SparkPlan蓝色部分;prepareForExecution结束后,会生成execution.SparkPlan黄色部分(全览图标注为第2、3步)。


5.png


plan.execute()调用的效果
这时会在driver端,递归的触发物理执行计划的doExecute()方法,这些方法一般都是返回对应的RDD。但在这个case里,由于调用了sort方法,生成了RangePartitioning对应的Exchange计划,为了实现排序后数据的均匀分布,spark会生成一个子job,对排序所依赖的RDD进行抽样,也就是说,会额外生成“Sort抽样子Job RDDs”一列,并由以下代码触发job的执行:

[mw_shl_code=bash,true]/*Partitioner.RangePartitioner */

  def sketch[K : ClassTag](
      rdd: RDD[K],
      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
    val shift = rdd.id
    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
      val seed = byteswap32(idx ^ (shift << 16))
      val (sample, n) = SamplingUtils.reservoirSampleAndCount(
        iter, sampleSizePerPartition, seed)
      Iterator((idx, n, sample))
    }.collect()
    val numItems = sketched.map(_._2.toLong).sum
    (numItems, sketched)
  }[/mw_shl_code]


该job对应UI上的Job6,而且由于该子job是提前执行的,所以能看到它的job id较小。
该步骤触发子job只是附带效果,真正的目的是完成主job物理计划到RDD的转化,全览图中,主子RDDs其实有很大一部分是重用的。原因是,在ExternalSort之前的Exchange里,childRdd = child.execute(),该rdd既被RangePartitioner使用,也被返回的ShuffledRDD使用。所以是一致的。
更详细地看下job6和7的RDD编号:
6.png
  • #279(含)之前的RDD都是主子job复用的
  • 子job的RDD号比主job的小,所以子job确实是先调度的
RDD.id是在初始化时生成的,所以代表着,以上的RDD也按数字顺序生成。

[mw_shl_code=bash,true]  protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
    child.execute().mapPartitions( { iterator =>[/mw_shl_code]

由于execute都是递归调用的,所以可以保证子child先执行,其对应的RDD先生成。
rdd.collect()调用的效果
终于轮到正主来了。它的执行就比较简单了,生成ResultStage,并递归触发所依赖的两个ShuffleStage先执行。
问题解答
  • 为什么貌似“toJavaRDD”时,生成了一个独立的Job6?
    • 因为sort触发了子job,对数据进行抽样,以实现排序后更均匀的分布
  • 为什么job7里,仅跳过了stage8(应该是复用了stage6),而不跳过stage9呢?
    • stage 6和stage 8的执行任务是一致的,但stage 7和stage 9其实是两码事,具体如下:
  • stage 6:hc.read.parquet(paths).filter(…).select(…)  + groupBy(all fileds).count()的前半段
  • stage 7:groupBy(all fileds).count() 后半段,以及抽样过程,阐述RangePartitioner
  • stage 8:被跳过,复用了stage6
  • stage 9:groupBy(all fileds).count() 后半段 + sort的前半段
  • stage 10:sort(…).toJavaRDD.mapPartitionsToPair(…).coalesce() + reduceByKey(…)的前半段
  • 为什么job7里,stage9与stage7的执行不一致?stage7包含了3个mapPartitionsRDD调用,而stage9仅包含一个
    • 解答与上面一样
经验与教训
1. 请考虑,如果hc.read.parquet().filter().select().sort().toJavaRDD.mapPartitions会如何呢?
这时同样会生成两个job,且 都是从hdfs读取数据了 ~~ 因为第二个job的sort前面没有shuffle dependency,没有办法复用第一个job的stage了。
2. df.sort与rdd.repartitionAndSort的方法选择上,之前认为sparksql进行了很多数据结构和执行计划方面的优化,sort的性能可能更好。但分析后发现,它还会做一个sample操作,所以哪个性能更高,还真不好说了。至少在我们的场景下,两者性能持平。而鉴于sort上面的小坑,倾向于使用后者。






已有(5)人评论

跳转到指定楼层
hw12321 发表于 2015-11-30 09:47:55
这个比较有用!
回复

使用道具 举报

yangchenwo 发表于 2015-11-30 17:32:55
版主写的很好 但是还是第四张图看不清楚啊,能不能传到百度云一下,发个地址让我们下一下。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条