分享

Spark源码系列(六)Shuffle的过程解析

本帖最后由 pig2 于 2014-9-14 12:42 编辑


问题导读:
1、shuffle过程的划分?
2、shuffle的中间结果如何存储?
3、shuffle的数据如何拉取过来?





Shuffle过程的划分
Spark的操作模型是基于RDD的,当调用RDD的reduceByKey、groupByKey等类似的操作的时候,就需要有shuffle了。再拿出reduceByKey这个来讲。

  1.   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
  2.     reduceByKey(new HashPartitioner(numPartitions), func)
  3.   }
复制代码



reduceByKey的时候,我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。

  1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
  2.     val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
  3.     for (r <- bySize if r.partitioner.isDefined) {
  4.       return r.partitioner.get
  5.     }
  6.     if (rdd.context.conf.contains("spark.default.parallelism")) {
  7.       new HashPartitioner(rdd.context.defaultParallelism)
  8.     } else {
  9.       new HashPartitioner(bySize.head.partitions.size)
  10.     }
  11.   }
复制代码



如果不指定reduce个数的话,就按默认的走:

1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。
2、如果没有定义,那么如果设置了spark.default.parallelism,就使用哈希的分区方式,reduce个数就是设置的这个值。
3、如果这个也没设置,那就按照输入数据的分片的数量来设定。如果是hadoop的输入数据的话,这个就多了。。。大家可要小心啊。

设定完之后,它会做三件事情,也就是之前讲的3次RDD转换。


  1. //map端先按照key合并一次
  2. val combined = self.mapPartitionsWithContext((context, iter) => {
  3.         aggregator.combineValuesByKey(iter, context)
  4. }, preservesPartitioning = true)
  5. //reduce抓取数据
  6. val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializer)
  7. //合并数据,执行reduce计算
  8. partitioned.mapPartitionsWithContext((context, iter) => {
  9.         new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
  10. }, preservesPartitioning = true)
复制代码





1、在第一个MapPartitionsRDD这里先做一次map端的聚合操作。
2、SHuffledRDD主要是做从这个抓取数据的工作。
3、第二个MapPartitionsRDD把抓取过来的数据再次进行聚合操作。
4、步骤1和步骤3都会涉及到spill的过程。

怎么做的聚合操作,回去看RDD那章。


Shuffle的中间结果如何存储
作业提交的时候,DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),具体的切分的位置在上图的虚线处。

map端的任务会作为一个ShuffleMapTask提交,最后在TaskRunner里面调用了它的runTask方法。


  1. override def runTask(context: TaskContext): MapStatus = {
  2.     val numOutputSplits = dep.partitioner.numPartitions
  3.     metrics = Some(context.taskMetrics)
  4.     val blockManager = SparkEnv.get.blockManager
  5.     val shuffleBlockManager = blockManager.shuffleBlockManager
  6.     var shuffle: ShuffleWriterGroup = null
  7.     var success = false
  8.     try {
  9.       // serializer为空的情况调用默认的JavaSerializer,也可以通过spark.serializer来设置成别的
  10.       val ser = Serializer.getSerializer(dep.serializer)
  11.       // 实例化Writer,Writer的数量=numOutputSplits=前面我们说的那个reduce的数量
  12.       shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, partitionId, numOutputSplits, ser)
  13.       // 遍历rdd的元素,按照key计算出来它所在的bucketId,然后通过bucketId找到相应的Writer写入
  14.       for (elem <- rdd.iterator(split, context)) {
  15.         val pair = elem.asInstanceOf[Product2[Any, Any]]
  16.         val bucketId = dep.partitioner.getPartition(pair._1)
  17.         shuffle.writers(bucketId).write(pair)
  18.       }
  19.       // 提交写入操作. 计算每个bucket block的大小
  20.       var totalBytes = 0L
  21.       var totalTime = 0L
  22.       val compressedSizes: Array[Byte] = shuffle.writers.map { writer: BlockObjectWriter =>
  23.         writer.commit()
  24.         writer.close()
  25.         val size = writer.fileSegment().length
  26.         totalBytes += size
  27.         totalTime += writer.timeWriting()
  28.         MapOutputTracker.compressSize(size)
  29.       }
  30.       // 更新 shuffle 监控参数.
  31.       val shuffleMetrics = new ShuffleWriteMetrics
  32.       shuffleMetrics.shuffleBytesWritten = totalBytes
  33.       shuffleMetrics.shuffleWriteTime = totalTime
  34.       metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
  35.       success = true
  36.       new MapStatus(blockManager.blockManagerId, compressedSizes)
  37.     } catch { case e: Exception =>
  38.       // 出错了,取消之前的操作,关闭writer
  39.       if (shuffle != null && shuffle.writers != null) {
  40.         for (writer <- shuffle.writers) {
  41.           writer.revertPartialWrites()
  42.           writer.close()
  43.         }
  44.       }
  45.       throw e
  46.     } finally {
  47.       // 关闭writer
  48.       if (shuffle != null && shuffle.writers != null) {
  49.         try {
  50.           shuffle.releaseWriters(success)
  51.         } catch {
  52.           case e: Exception => logError("Failed to release shuffle writers", e)
  53.         }
  54.       }
  55.       // 执行注册的回调函数,一般是做清理工作
  56.       context.executeOnCompleteCallbacks()
  57.     }
  58.   }
复制代码



遍历每一个记录,通过它的key来确定它的bucketId,再通过这个bucket的writer写入数据。

下面我们看看ShuffleBlockManager的forMapTask方法吧。


  1. def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
  2.     new ShuffleWriterGroup {
  3.       shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
  4.       private val shuffleState = shuffleStates(shuffleId)
  5.       private var fileGroup: ShuffleFileGroup = null
  6.       val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
  7.         fileGroup = getUnusedFileGroup()
  8.         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
  9.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
  10.       // 从已有的文件组里选文件,一个bucket一个文件,即要发送到同一个reduce的数据写入到同一个文件
  11.           blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
  12.         }
  13.       } else {
  14.         Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
  15.           // 按照blockId来生成文件,文件数为map数*reduce数
  16.           val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
  17.           val blockFile = blockManager.diskBlockManager.getFile(blockId)
  18.           if (blockFile.exists) {
  19.             if (blockFile.delete()) {
  20.               logInfo(s"Removed existing shuffle file $blockFile")
  21.             } else {
  22.               logWarning(s"Failed to remove existing shuffle file $blockFile")
  23.             }
  24.           }
  25.           blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
  26.         }
  27.       }
复制代码



1、map的中间结果是写入到本地硬盘的,而不是内存。
2、默认是一个map的中间结果文件是M*R(M=map数量,R=reduce的数量),设置了spark.shuffle.consolidateFiles为true之后是R个文件,根据bucketId把要分到同一个reduce的结果写入到一个文件中。
3、consolidateFiles采用的是一个reduce一个文件,它还记录了每个map的写入起始位置,所以查找的时候,先通过reduceId查找到哪个文件,再同坐mapId查找索引当中的起始位置offset,长度length=(mapId + 1).offset -(mapId).offset,这样就可以确定一个FileSegment(file, offset, length)。
4、Finally,存储结束之后, 返回了一个new MapStatus(blockManager.blockManagerId, compressedSizes),把blockManagerId和block的大小都一起返回。

个人想法,shuffle这块和hadoop的机制差别不大,tez这样的引擎会赶上spark的速度呢?还是让我们拭目以待吧!


Shuffle的数据如何拉取过来
ShuffleMapTask结束之后,最后走到DAGScheduler的handleTaskCompletion方法当中(关于中间的过程,请看《图解作业生命周期》)。


  1. case smt: ShuffleMapTask =>
  2. val status = event.result.asInstanceOf[MapStatus]
  3. val execId = status.location.executorId
  4. if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
  5.     logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
  6. } else {
  7.     stage.addOutputLoc(smt.partitionId, status)
  8. }
  9. if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
  10.     markStageAsFinished(stage)
  11.     if (stage.shuffleDep.isDefined) {
  12.          // 真的map过程才会有这个依赖,reduce过程None
  13.          mapOutputTracker.registerMapOutputs(
  14.      stage.shuffleDep.get.shuffleId,
  15.          stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
  16.          changeEpoch = true)
  17.      }
  18.       clearCacheLocs()
  19.       if (stage.outputLocs.exists(_ == Nil)) {
  20.           // 一些任务失败了,需要重新提交stage
  21.           submitStage(stage)
  22.        } else {
  23.           // 提交下一批任务              
  24.    }
  25. }
复制代码



1、把结果添加到Stage的outputLocs数组里,它是按照数据的分区Id来存储映射关系的partitionId->MapStaus。
2、stage结束之后,通过mapOutputTracker的registerMapOutputs方法,把此次shuffle的结果outputLocs记录到mapOutputTracker里面。

这个stage结束之后,就到ShuffleRDD运行了,我们看一下它的compute函数。

  1. SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context, ser)
复制代码



它是通过ShuffleFetch的fetch方法来抓取的,具体实现在BlockStoreShuffleFetcher里面。

  1. override def fetch[T](
  2.       shuffleId: Int,
  3.       reduceId: Int,
  4.       context: TaskContext,
  5.       serializer: Serializer)
  6.     : Iterator[T] =
  7. {
  8.     val blockManager = SparkEnv.get.blockManager
  9.     val startTime = System.currentTimeMillis
  10.    // mapOutputTracker也分Master和Worker,Worker向Master请求获取reduce相关的MapStatus,主要是(BlockManagerId和size)
  11.     val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
  12.     // 一个BlockManagerId对应多个文件的大小
  13.     val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
  14.     for (((address, size), index) <- statuses.zipWithIndex) {
  15.       splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
  16.     }
  17.     // 构造BlockManagerId 和 BlockId的映射关系,想不到ShffleBlockId的mapId,居然是1,2,3,4的序列...
  18.     val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map {
  19.       case (address, splits) =>
  20.         (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
  21.     }
  22.     // 名为updateBlock,实际是检验函数,每个Block都对应着一个Iterator接口,如果该接口为空,则应该报错
  23.     def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
  24.       val blockId = blockPair._1
  25.       val blockOption = blockPair._2
  26.       blockOption match {
  27.         case Some(block) => {
  28.           block.asInstanceOf[Iterator[T]]
  29.         }
  30.         case None => {
  31.           blockId match {
  32.             case ShuffleBlockId(shufId, mapId, _) =>
  33.               val address = statuses(mapId.toInt)._1
  34.               throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
  35.             case _ =>
  36.               throw new SparkException("Failed to get block " + blockId + ", which is not a shuffle block")
  37.           }
  38.         }
  39.       }
  40.     }
  41.     // 从blockManager获取reduce所需要的全部block,并添加校验函数
  42.     val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
  43.     val itr = blockFetcherItr.flatMap(unpackBlock)
  44.    
  45.   val completionIter = CompletionIterator[T, Iterator[T]](itr, {
  46.       // CompelteIterator迭代结束之后,会执行以下这部分代码,提交它记录的各种参数
  47.       val shuffleMetrics = new ShuffleReadMetrics
  48.       shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
  49.       shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
  50.       shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
  51.       shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
  52.       shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
  53.       shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
  54.       context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
  55.     })
  56.     new InterruptibleIterator[T](context, completionIter)
  57.   }
  58. }
复制代码



1、MapOutputTrackerWorker向MapOutputTrackerMaster获取shuffle相关的map结果信息。
2、把map结果信息构造成BlockManagerId --> Array(BlockId, size)的映射关系。
3、通过BlockManager的getMultiple批量拉取block。
4、返回一个可遍历的Iterator接口,并更新相关的监控参数。

我们继续看getMultiple方法。


  1. def getMultiple(
  2.       blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
  3.       serializer: Serializer): BlockFetcherIterator = {
  4.     val iter =
  5.       if (conf.getBoolean("spark.shuffle.use.netty", false)) {
  6.         new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer)
  7.       } else {
  8.         new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer)
  9.       }
  10.     iter.initialize()
  11.     iter
  12.   }
复制代码



分两种情况处理,分别是netty的和Basic的,Basic的就不讲了,就是通过ConnectionManager去指定的BlockManager那里获取数据,上一章刚好说了。

我们讲一下Netty的吧,这个是需要设置的才能启用的,不知道性能会不会好一些呢?

看NettyBlockFetcherIterator的initialize方法,再看BasicBlockFetcherIterator的initialize方法,发现Basic的不能同时抓取超过48Mb的数据。


  1. override def initialize() {
  2.       // 分开本地请求和远程请求,返回远程的FetchRequest
  3.       val remoteRequests = splitLocalRemoteBlocks()
  4.       // 抓取顺序随机
  5.       for (request <- Utils.randomize(remoteRequests)) {
  6.         fetchRequestsSync.put(request)
  7.       }
  8.       // 默认是开6个线程去进行抓取
  9.       copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6))// 读取本地的block
  10.       getLocalBlocks()
  11.    }
复制代码



在NettyBlockFetcherIterator的sendRequest方法里面,发现它是通过ShuffleCopier来试下的。


  1.  val cpier = new ShuffleCopier(blockManager.conf)
  2.    cpier.getBlocks(cmId, req.blocks, putResult)
复制代码



这块接下来就是netty的客户端调用的方法了,我对这个不了解。在服务端的处理是在DiskBlockManager内部启动了一个ShuffleSender的服务,最终的业务处理逻辑是在FileServerHandler。

它是通过getBlockLocation返回一个FileSegment,下面这段代码是ShuffleBlockManager的getBlockLocation方法。


  1. def getBlockLocation(id: ShuffleBlockId): FileSegment = {
  2.     // Search all file groups associated with this shuffle.
  3.     val shuffleState = shuffleStates(id.shuffleId)
  4.     for (fileGroup <- shuffleState.allFileGroups) {
  5.       val segment = fileGroup.getFileSegmentFor(id.mapId, id.reduceId)
  6.       if (segment.isDefined) { return segment.get }
  7.     }
  8.     throw new IllegalStateException("Failed to find shuffle block: " + id)
  9.   }
复制代码



先通过shuffleId找到ShuffleState,再通过reduceId找到文件,最后通过mapId确定它的文件分片的位置。但是这里有个疑问了,如果启用了consolidateFiles,一个reduce的所需数据都在一个文件里,是不是就可以把整个文件一起返回呢,而不是通过N个map来多次读取?还是害怕一次发送一个大文件容易失败?这就不得而知了。

到这里整个过程就讲完了。可以看得出来Shuffle这块还是做了一些优化的,但是这些参数并没有启用,有需要的朋友可以自己启用一下试试效果。

相关内容推荐:
Spark源码系列(一)spark-submit提交作业过程
Spark源码系列(二)RDD详解
Spark源码系列(三)作业运行过程
Spark源码系列(四)图解作业生命周期
Spark源码系列(五)分布式缓存
Spark源码系列(六)Shuffle的过程解析
Spark源码系列(七)Spark on yarn具体实现
Spark源码系列(八)Spark Streaming实例分析


作者:岑玉海

欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

已有(3)人评论

跳转到指定楼层
Reynold.C 发表于 2016-8-5 10:15:21
好,很好,就是版本低了点
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条