分享

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现

本帖最后由 pig2 于 2015-1-6 14:19 编辑

问题导读
1.sort-based shuffle算法会产生哪些中间文件?
2.如果是使用SORT,效果如何呢?











概要
Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。

Sort-based Shuffle之初体验
通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。

步骤1: 修改conf/spark-default.conf, 加入如下内容

  1. spark.shuffle.manager SORT
复制代码
步骤2: 运行spark-shell
  1. SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell
复制代码
步骤3: 执行wordcount
  1. sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect
复制代码
步骤4: 查看生成的中间文件
  1. find /tmp/spark-local* -type f
复制代码
文件查找结果如下所示
  1. /tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index
  2. /tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index
  3. /tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data
  4. /tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data
复制代码
可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。

如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的SORT改为HASH,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。
  1. /tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3
  2. /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2
  3. /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3
  4. /tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0
  5. /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0
  6. /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1
  7. /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1
  8. /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2
复制代码
两者生成的文件数量差异非常大,具体数值计算如下
  • 在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
  • 在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件

多次shuffle
刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下
  1. sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect
复制代码
上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。

在HASH模式下产生的文件如下所示
  1. /tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
  2. /tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
  3. /tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
  4. /tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
  5. /tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
  6. /tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
  7. /tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
  8. /tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
  9. /tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
  10. /tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
  11. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
  12. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
  13. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
  14. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
  15. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
  16. /tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
  17. /tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
  18. /tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
  19. /tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
  20. /tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
  21. /tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
  22. /tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
  23. /tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
  24. /tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0
复制代码
引入一次新的shuffle,产生了大量的中间文件
如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。
  1. /tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data
  2. /tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index
  3. /tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data
  4. /tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data
  5. /tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data
  6. /tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index
  7. /tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index
  8. /tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index
  9. /tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index
  10. /tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data
  11. /tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data
  12. /tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index
复制代码
值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。

Sort-based Shuffle的设计思想
sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。

其具体实现步骤如下:
  • Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
  • ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
  • 如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
  • 当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
  • 最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件

相应的源文件
  • SortShuffleManager.scala
  • SortShuffleWriter.scala
  • ExternalSorter.scala
  • IndexShuffleBlockManager.scala

几个重要的函数
SortShuffleWriter.write
  1. override def write(records: Iterator[_ >: Product2[K, V]]): Unit = {
  2.     if (dep.mapSideCombine) {
  3.       if (!dep.aggregator.isDefined) {
  4.         throw new IllegalStateException("Aggregator is empty for map-side combine")
  5.       }
  6.       sorter = new ExternalSorter[K, V, C](
  7.         dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  8.       sorter.insertAll(records)
  9.     } else {
  10.       // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
  11.       // care whether the keys get sorted in each partition; that will be done on the reduce side
  12.       // if the operation being run is sortByKey.
  13.       sorter = new ExternalSorter[K, V, V](
  14.         None, Some(dep.partitioner), None, dep.serializer)
  15.       sorter.insertAll(records)
  16.     }
  17.     val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
  18.     val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
  19.     val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
  20.     shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
  21.     mapStatus = new MapStatus(blockManager.blockManagerId,
  22.       partitionLengths.map(MapOutputTracker.compressSize))
  23.   }
复制代码
ExternalSorter.insertAll
  1. def insertAll(records: Iterator[_  {
  2.         if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
  3.       }
  4.       while (records.hasNext) {
  5.         elementsRead += 1
  6.         kv = records.next()
  7.         map.changeValue((getPartition(kv._1), kv._1), update)
  8.         maybeSpill(usingMap = true)
  9.       }
  10.     } else {
  11.       // Stick values into our buffer
  12.       while (records.hasNext) {
  13.         elementsRead += 1
  14.         val kv = records.next()
  15.         buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])
  16.         maybeSpill(usingMap = false)
  17.       }
  18.     }
  19.   }
复制代码
writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中
  1. def writePartitionedFile(
  2.       blockId: BlockId,
  3.       context: TaskContext,
  4.       outputFile: File): Array[Long] = {
  5.     // Track location of each range in the output file
  6.     val lengths = new Array[Long](numPartitions)
  7.     if (bypassMergeSort && partitionWriters != null) {
  8.       // We decided to write separate files for each partition, so just concatenate them. To keep
  9.       // this simple we spill out the current in-memory collection so that everything is in files.
  10.       spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
  11.       partitionWriters.foreach(_.commitAndClose())
  12.       var out: FileOutputStream = null
  13.       var in: FileInputStream = null
  14.       try {
  15.         out = new FileOutputStream(outputFile)
  16.         for (i <- 0 until numPartitions) {
  17.           in = new FileInputStream(partitionWriters(i).fileSegment().file)
  18.           val size = org.apache.spark.util.Utils.copyStream(in, out, false)
  19.           in.close()
  20.           in = null
  21.           lengths(i) = size
  22.         }
  23.       } finally {
  24.         if (out != null) {
  25.           out.close()
  26.         }
  27.         if (in != null) {
  28.           in.close()
  29.         }
  30.       }
  31.     } else {
  32.       // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by
  33.       // partition and just write everything directly.
  34.       for ((id, elements) <- this.partitionedIterator) {
  35.         if (elements.hasNext) {
  36.           val writer = blockManager.getDiskWriter(
  37.             blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get)
  38.           for (elem
复制代码
而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置
  1. override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
  2.     // The block is actually going to be a range of a single map output file for this map, so
  3.     // find out the consolidated file, then the offset within that from our index
  4.     val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
  5.     val in = new DataInputStream(new FileInputStream(indexFile))
  6.     try {
  7.       in.skip(blockId.reduceId * 8)
  8.       val offset = in.readLong()
  9.       val nextOffset = in.readLong()
  10.       new FileSegmentManagedBuffer(
  11.         getDataFile(blockId.shuffleId, blockId.mapId),
  12.         offset,
  13.         nextOffset - offset)
  14.     } finally {
  15.       in.close()
  16.     }
  17.   }
复制代码


参数资料

相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

Apache Spark源码走读之13 -- hiveql on spark实现详解

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现


欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条