分享

Spark源码系列(五)分布式缓存

本帖最后由 pig2 于 2017-3-1 15:53 编辑


问题导读:

spark缓存是如何实现的?
BlockManager与BlockManagerMaster的关系是什么?




这一章想讲一下Spark的缓存是如何实现的。这个persist方法是在RDD里面的,所以我们直接打开RDD这个类。


  1. def persist(newLevel: StorageLevel): this.type = {
  2.     // StorageLevel不能随意更改
  3.     if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
  4.       throw new UnsupportedOperationException("Cannot change storage level of an RDD after it was already assigned a level")
  5.     }
  6.     sc.persistRDD(this)
  7.     // Register the RDD with the ContextCleaner for automatic GC-based cleanup
  8.     // 注册清理方法
  9.     sc.cleaner.foreach(_.registerRDDForCleanup(this))
  10.     storageLevel = newLevel
  11.     this
  12.   }
复制代码



它调用SparkContext去缓存这个RDD,追杀下去。

  1.   private[spark] def persistRDD(rdd: RDD[_]) {
  2.     persistentRdds(rdd.id) = rdd
  3.   }
复制代码



它居然是用一个HashMap来存的,具体看这个map的类型是TimeStampedWeakValueHashMap[Int, RDD[_]]类型。把存进去的值都隐式转换成WeakReference,然后加到一个内部的一个ConcurrentHashMap里面。这里貌似也没干啥,这是有个鸟蛋用。。大神莫喷,知道干啥用的人希望告诉我一下。


CacheManager
现在并没有保存,等到真正运行Task运行的时候才会去缓存起来。入口在Task的runTask方法里面,具体的我们可以看ResultTask,它调用了RDD的iterator方法。


  1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  2.     if (storageLevel != StorageLevel.NONE) {
  3.       SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
  4.     } else {
  5.       computeOrReadCheckpoint(split, context)
  6.     }
  7.   }
复制代码



一旦设置了StorageLevel,就要从SparkEnv的cacheManager取数据。


  1. def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = {
  2.     val key = RDDBlockId(rdd.id, split.index)
  3.     blockManager.get(key) match {
  4.       case Some(values) =>
  5.         // 已经有了,直接返回就可以了
  6.         new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
  7.       case None =>
  8.         // loading包含这个key表示已经有人在加载了,等到loading被释放了,就可以去blockManager里面取到了
  9.         loading.synchronized {
  10.           if (loading.contains(key)) {
  11.             while (loading.contains(key)) {
  12.               try {
  13.                 loading.wait()
  14.               } catch {
  15.                 case e: Exception =>
  16.                   logWarning(s"Got an exception while waiting for another thread to load $key", e)
  17.               }
  18.             }
  19.             // 别人成功拿到了,我们直接取结果就是了,如果别人取失败了,我们再来取一次
  20.             blockManager.get(key) match {
  21.               case Some(values) =>
  22.                 return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
  23.               case None =>
  24.                 loading.add(key)
  25.             }
  26.           } else {
  27.             loading.add(key)
  28.           }
  29.         }
  30.         try {
  31.           // 通过rdd自身的compute方法去计算得到结果,回去看看RDD那文章,自己看看源码就清楚了
  32.           val computedValues = rdd.computeOrReadCheckpoint(split, context)
  33.           // 如果是本地运行的,就没必要缓存了,直接返回即可
  34.           if (context.runningLocally) {
  35.             return computedValues
  36.           }
  37.           // 跟踪blocks的更新状态
  38.           var updatedBlocks = Seq[(BlockId, BlockStatus)]()
  39.           val returnValue: Iterator[T] = {
  40.             if (storageLevel.useDisk && !storageLevel.useMemory) {
  41.               /* 这是RDD采用DISK_ONLY的情况,直接扔给blockManager
  42.                * 然后把结果直接返回,它不需要把结果一下子全部加载进内存
  43.                * 这同样适用于MEMORY_ONLY_SER,但是我们需要在启用它之前确认blocks没被block store给丢弃 */
  44.               updatedBlocks = blockManager.put(key, computedValues, storageLevel, tellMaster = true)
  45.               blockManager.get(key) match {
  46.                 case Some(values) =>
  47.                   values.asInstanceOf[Iterator[T]]
  48.                 case None =>
  49.                   throw new Exception("Block manager failed to return persisted valued")
  50.               }
  51.             } else {
  52.               // 先存到一个ArrayBuffer,然后一次返回,在blockManager里也存一份
  53.               val elements = new ArrayBuffer[Any]
  54.               elements ++= computedValues
  55.               updatedBlocks = blockManager.put(key, elements, storageLevel, tellMaster = true)
  56.               elements.iterator.asInstanceOf[Iterator[T]]
  57.             }
  58.           }
  59.           // 更新task的监控参数
  60.           val metrics = context.taskMetrics
  61.           metrics.updatedBlocks = Some(updatedBlocks)
  62.           new InterruptibleIterator(context, returnValue)
  63.         } finally {
  64.           // 改完了,释放锁
  65.           loading.synchronized {
  66.             loading.remove(key)
  67.             loading.notifyAll()
  68.           }
  69.         }
  70.     }
  71.   }
复制代码



1、如果blockManager当中有,直接从blockManager当中取。
2、如果blockManager没有,就先用RDD的compute函数得到出来一个Iterable接口。
3、如果StorageLevel是只保存在硬盘的话,就把值存在blockManager当中,然后从blockManager当中取,这样的好处是不会一次把数据全部加载进内存。
4、如果StorageLevel是需要使用内存的情况,就把结果添加到一个ArrayBuffer当中一次返回,另外在blockManager存上一份,下次直接从blockManager取。

对StorageLevel说明一下吧,贴一下它的源码。


  1. class StorageLevel private(
  2.     private var useDisk_ : Boolean,
  3.     private var useMemory_ : Boolean,
  4.     private var useOffHeap_ : Boolean,
  5.     private var deserialized_ : Boolean,
  6.     private var replication_ : Int = 1)
  7.   val NONE = new StorageLevel(false, false, false, false)
  8.   val DISK_ONLY = new StorageLevel(true, false, false, false)
  9.   val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  10.   val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  11.   val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  12.   val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  13.   val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  14.   val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  15.   val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  16.   val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  17.   val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  18.   val OFF_HEAP = new StorageLevel(false, false, true, false)
复制代码



大家注意看它那几个参数,useDisk_、useMemory_、useOffHeap_、deserialized_、replication_ 在具体的类型的时候是传的什么值。

下面我们的目标要放到blockManager。


BlockManager
BlockManager这个类比较大,我们从两方面开始看吧,putBytes和get方法。先从putBytes说起,之前说过Task运行结束之后,结果超过10M的话,会用BlockManager缓存起来。

  1. env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
复制代码



putBytes内部又掉了另外一个方法doPut,方法很大呀,先折叠起来。

  1. private def doPut(
  2.       blockId: BlockId,
  3.       data: Values,
  4.       level: StorageLevel,
  5.       tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {// Return value
  6.     val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
  7.     // 记录它的StorageLevel,以便我们可以在它加载进内存之后,可以按需写入硬盘。
  8.   // 此外,在我们把调用BlockInfo的markReay方法之前,都没法通过get方法获得该部分内容
  9.     val putBlockInfo = {
  10.       val tinfo = new BlockInfo(level, tellMaster)
  11.       // 如果不存在,就添加到blockInfo里面
  12.       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
  13.       if (oldBlockOpt.isDefined) {
  14.         // 如果已经存在了,就不需要重复添加了
  15.         if (oldBlockOpt.get.waitForReady()) {return updatedBlocks
  16.         }
  17.         // 存在于blockInfo当中->但是上一次保存失败了,拿出旧的信息,再试一遍
  18.         oldBlockOpt.get
  19.       } else {
  20.         tinfo
  21.       }
  22.     }
  23.     val startTimeMs = System.currentTimeMillis
  24.     // 当我们需要存储数据,并且要复制数据到别的机器,我们需要访问它的值,但是因为我们的put操作会读取整个iterator,
  25.     // 这就不会有任何的值留下。在我们保存序列化的数据的场景,我们可以记住这些bytes,但在其他场景,比如反序列化存储的
  26.     // 时候,我们就必须依赖返回一个Iterator
  27.     var valuesAfterPut: Iterator[Any] = null
  28.     // Ditto for the bytes after the put
  29.     var bytesAfterPut: ByteBuffer = null
  30.     // Size of the block in bytes
  31.     var size = 0L
  32.     // 在保存数据之前,我们要实例化,在数据已经序列化并且准备好发送的情况下,这个过程是很快的
  33.     val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
  34.       // duplicate并不是复制这些数据,只是做了一个包装
  35.       val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
  36.       Future {
  37.         // 把block复制到别的机器上去
  38.         replicate(blockId, bufferView, level)
  39.       }
  40.     } else {
  41.       null
  42.     }
  43.     putBlockInfo.synchronized {
  44.       var marked = false
  45.       try {
  46.         if (level.useMemory) {
  47.           // 首先是保存到内存里面,尽管它也使用硬盘,等内存不够的时候,才会写入硬盘
  48.           // 下面分了三种情况,但是Task的结果是ByteBufferValues这种情况,具体看putBytes方法
  49.           val res = data match {
  50.             case IteratorValues(iterator) =>
  51.               memoryStore.putValues(blockId, iterator, level, true)
  52.             case ArrayBufferValues(array) =>
  53.               memoryStore.putValues(blockId, array, level, true)
  54.             case ByteBufferValues(bytes) =>
  55.               bytes.rewind()
  56.               memoryStore.putBytes(blockId, bytes, level)
  57.           }
  58.           size = res.size
  59.           // 这里写得那么恶心,是跟data的类型有关系的,data: Either[Iterator[_], ByteBuffer],Left是Iterator,Right是ByteBuffer
  60.           res.data match {
  61.             case Right(newBytes) => bytesAfterPut = newBytes
  62.             case Left(newIterator) => valuesAfterPut = newIterator
  63.           }
  64.           // 把被置换到硬盘的blocks记录到updatedBlocks上
  65.           res.droppedBlocks.foreach { block => updatedBlocks += block }
  66.         } else if (level.useOffHeap) {
  67.           // 保存到Tachyon上.
  68.           val res = data match {
  69.             case IteratorValues(iterator) =>
  70.               tachyonStore.putValues(blockId, iterator, level, false)
  71.             case ArrayBufferValues(array) =>
  72.               tachyonStore.putValues(blockId, array, level, false)
  73.             case ByteBufferValues(bytes) =>
  74.               bytes.rewind()
  75.               tachyonStore.putBytes(blockId, bytes, level)
  76.           }
  77.           size = res.size
  78.           res.data match {
  79.             case Right(newBytes) => bytesAfterPut = newBytes
  80.             case _ =>
  81.           }
  82.         } else {
  83.           // 直接保存到硬盘,不要复制到其它节点的就别返回数据了.
  84.           val askForBytes = level.replication > 1
  85.           val res = data match {
  86.             case IteratorValues(iterator) =>
  87.               diskStore.putValues(blockId, iterator, level, askForBytes)
  88.             case ArrayBufferValues(array) =>
  89.               diskStore.putValues(blockId, array, level, askForBytes)
  90.             case ByteBufferValues(bytes) =>
  91.               bytes.rewind()
  92.               diskStore.putBytes(blockId, bytes, level)
  93.           }
  94.           size = res.size
  95.           res.data match {
  96.             case Right(newBytes) => bytesAfterPut = newBytes
  97.             case _ =>
  98.           }
  99.         }
  100.      // 通过blockId获得当前的block状态
  101.         val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
  102.         if (putBlockStatus.storageLevel != StorageLevel.NONE) {
  103.           // 成功了,把该block标记为ready,通知BlockManagerMaster
  104.           marked = true
  105.           putBlockInfo.markReady(size)
  106.           if (tellMaster) {
  107.             reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
  108.           }
  109.           updatedBlocks += ((blockId, putBlockStatus))
  110.         }
  111.       } finally {
  112.         // 如果没有标记成功,就把该block信息清除
  113.        if (!marked) {
  114.           blockInfo.remove(blockId)
  115.           putBlockInfo.markFailure()
  116.         }
  117.       }
  118.     }
  119.     // 把数据发送到别的节点做备份
  120.     if (level.replication > 1) {
  121.       data match {
  122.         case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
  123.         case _ => {
  124.           val remoteStartTime = System.currentTimeMillis
  125.           // 把Iterator里面的数据序列化之后,发送到别的节点
  126.           if (bytesAfterPut == null) {
  127.             if (valuesAfterPut == null) {
  128.               throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
  129.             }
  130.             bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
  131.           }
  132.           replicate(blockId, bytesAfterPut, level)
  133.         }
  134.       }
  135.     }
  136.     // 销毁bytesAfterPut
  137.     BlockManager.dispose(bytesAfterPut)
  138.     updatedBlocks
  139.   }
复制代码



从上面的的来看:

1、存储的时候按照不同的存储级别分了3种情况来处理:存在内存当中(包括MEMORY字样的),存在tachyon上(OFF_HEAP),只存在硬盘上(DISK_ONLY)。
2、存储完成之后会根据存储级别决定是否发送到别的节点,在名字上最后带2字的都是这种,2表示一个block会在两个节点上保存。
3、存储完毕之后,会向BlockManagerMaster汇报block的情况。
4、这里面的序列化其实是先压缩后序列化,默认使用的是LZF压缩,可以通过spark.io.compression.codec设定为snappy或者lzo,序列化方式通过spark.serializer设置,默认是JavaSerializer。




接下来我们再看get的情况。


  1.    val local = getLocal(blockId)
  2.     if (local.isDefined) return local
  3.     val remote = getRemote(blockId)
  4.     if (remote.isDefined) return remote
  5.     None
复制代码



先从本地取,本地没有再去别的节点取,都没有,返回None。从本地取就不说了,怎么进怎么出。讲一下怎么从别的节点去,它们是一个什么样子的关系?

我们先看getRemote方法


  1.   private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
  2.     val locations = Random.shuffle(master.getLocations(blockId))
  3.     for (loc <- locations) {
  4.       val data = BlockManagerWorker.syncGetBlock(GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
  5.       if (data != null) {
  6.         if (asValues) {
  7.           return Some(dataDeserialize(blockId, data))
  8.         } else {
  9.           return Some(data)
  10.         }
  11.       }
  12.     }
  13.     None
  14.   }
复制代码



这个方法包括两个步骤:

1、用blockId通过master的getLocations方法找到它的位置。
2、通过BlockManagerWorker.syncGetBlock到指定的节点获取数据。
ok,下面就重点讲BlockManager和BlockManagerMaster之间的关系,以及BlockManager之间是如何相互传输数据。


BlockManager与BlockManagerMaster的关系
BlockManager我们使用的时候是从SparkEnv.get获得的,我们观察了一下SparkEnv,发现它包含了我们运行时候常用的那些东东。那它创建是怎么创建的呢,我们找到SparkEnv里面的create方法,右键FindUsages,就会找到两个地方调用了,一个是SparkContext,另一个是Executor。在SparkEnv的create方法里面会实例化一个BlockManager和BlockManagerMaster。这里我们需要注意看BlockManagerMaster的实例化方法,里面调用了registerOrLookup方法。


  1.     def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
  2.       if (isDriver) {
  3.         actorSystem.actorOf(Props(newActor), name = name)
  4.       } else {
  5.         val driverHost: String = conf.get("spark.driver.host", "localhost")
  6.         val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  7.         Utils.checkHost(driverHost, "Expected hostname")
  8.         val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
  9.         val timeout = AkkaUtils.lookupTimeout(conf)
  10.         Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
  11.       }
  12.     }
复制代码



所以从这里可以看出来,除了Driver之后的actor都是,都是持有的Driver的引用ActorRef。梳理一下,我们可以得出以下结论:

1、SparkContext持有一个BlockManager和BlockManagerMaster。
2、每一个Executor都持有一个BlockManager和BlockManagerMaster。
3、Executor和SparkContext的BlockManagerMaster通过BlockManagerMasterActor来通信。

接下来,我们看看BlockManagerMasterActor里的三组映射关系。


  1. // 1、BlockManagerId和BlockManagerInfo的映射关系
  2.   private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]
  3.   // 2、Executor ID 和 Block manager ID的映射关系
  4.   private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
  5.   // 3、BlockId和保存它的BlockManagerId的映射关系
  6.   private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
复制代码



看到这三组关系,前面的getLocations方法不用看它的实现,我们都应该知道是怎么找了。


BlockManager相互传输数据
BlockManager之间发送数据和接受数据是通过BlockManagerWorker的syncPutBlock和syncGetBlock方法来实现。看BlockManagerWorker的注释,说是BlockManager的网络接口,采用的是事件驱动模型。

再仔细看这两个方法,它传输的数据包装成BlockMessage之后,通过ConnectionManager的sendMessageReliablySync方法来传输。

接下来的故事就是nio之间的发送和接收了,就简单说几点吧:

1、ConnectionManager内部实例化一个selectorThread线程来接收消息,具体请看run方法。
2、Connection发送数据的时候,是一次把消息队列的message全部发送,不是一个一个message发送,具体看SendConnection的write方法,与之对应的接收看ReceivingConnection的read方法。
3、read完了之后,调用回调函数ConnectionManager的receiveMessage方法,它又调用了handleMessage方法,handleMessage又调用了BlockManagerWorker的onBlockMessageReceive方法。传说中的事件驱动又出现了。


  1. def processBlockMessage(blockMessage: BlockMessage): Option[BlockMessage] = {
  2.     blockMessage.getType match {
  3.       case BlockMessage.TYPE_PUT_BLOCK => {
  4.         val pB = PutBlock(blockMessage.getId, blockMessage.getData, blockMessage.getLevel)
  5.         putBlock(pB.id, pB.data, pB.level)
  6.         None
  7.       }
  8.       case BlockMessage.TYPE_GET_BLOCK => {
  9.         val gB = new GetBlock(blockMessage.getId)
  10.         val buffer = getBlock(gB.id)
  11.         Some(BlockMessage.fromGotBlock(GotBlock(gB.id, buffer)))
  12.       }
  13.       case _ => None
  14.     }
  15.   }
复制代码



根据BlockMessage的类型进行处理,put类型就保存数据,get类型就从本地把block读出来返回给它。



注:BlockManagerMasterActor是存在于BlockManagerMaster内部,画在外面只是因为它在通信的时候起了关键的作用的,Executor上持有的BlockManagerMasterActor均是Driver节点的Actor的引用。


广播变量
先回顾一下怎么使用广播变量:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
  3. scala> broadcastVar.value
  4. res0: Array[Int] = Array(1, 2, 3)
复制代码



看了一下实现调用的是broadcastFactory的newBroadcast方法。

  1.   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
  2.     broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  3.   }
复制代码



默认的broadcastFactory是HttpBroadcastFactory,内部还有另外一个实现TorrentBroadcastFactory,先说HttpBroadcastFactory的newBroadcast方法。
它直接new了一个HttpBroadcast。


  1.   HttpBroadcast.synchronized {
  2.     SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  3.   }
  4.   if (!isLocal) {
  5.     HttpBroadcast.write(id, value_)
  6.   }
复制代码



它的内部做了两个操作,把数据保存到driver端的BlockManager并且写入到硬盘。
TorrentBroadcast和HttpBroadcast都把数据存进了BlockManager做备份,但是TorrentBroadcast接着并没有把数据写入文件,而是采用了下面这种方式:


  1.   def sendBroadcast() {
  2.     // 把数据给切分了,每4M一个分片
  3.     val tInfo = TorrentBroadcast.blockifyObject(value_)
  4.     totalBlocks = tInfo.totalBlocks
  5.     totalBytes = tInfo.totalBytes
  6.     hasBlocks = tInfo.totalBlocks
  7.     // 把分片的信息存到BlockManager,并通知Master
  8.     val metaId = BroadcastBlockId(id, "meta")
  9.     val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
  10.     TorrentBroadcast.synchronized {
  11.       SparkEnv.get.blockManager.putSingle(
  12.         metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
  13.     }
  14.     // 遍历所有分片,存到BlockManager上面,并通知Master
  15.     for (i <- 0 until totalBlocks) {
  16.       val pieceId = BroadcastBlockId(id, "piece" + i)
  17.       TorrentBroadcast.synchronized {
  18.         SparkEnv.get.blockManager.putSingle(
  19.           pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
  20.       }
  21.     }
  22.   }
复制代码



1、把数据序列化之后,每4M切分一下。
2、切分完了之后,把所有分片写入BlockManager。

但是找不到它们是怎么传播的??

未完待续!


相关参数
  1. // BlockManager的最大内存
  2. spark.storage.memoryFraction 默认值0.6
  3. // 文件保存的位置
  4. spark.local.dir 默认是系统变量java.io.tmpdir的值
  5. // tachyon保存的地址
  6. spark.tachyonStore.url 默认值tachyon://localhost:19998
  7. // 默认不启用netty来传输shuffle的数据
  8. spark.shuffle.use.netty 默认值是false
  9. spark.shuffle.sender.port 默认值是0
  10. // 一个reduce抓取map中间结果的最大的同时抓取数量大小(to avoid over-allocating memory for receiving shuffle outputs)
  11. spark.reducer.maxMbInFlight 默认值是48*1024*1024
  12. // TorrentBroadcast切分数据块的分片大小
  13. spark.broadcast.blockSize 默认是4096
  14. // 广播变量的工厂类
  15. spark.broadcast.factory 默认是org.apache.spark.broadcast.HttpBroadcastFactory,也可以设置为org.apache.spark.broadcast.TorrentBroadcastFactory
  16. // 压缩格式
  17. spark.io.compression.codec 默认是LZF,可以设置成Snappy或者Lzo
复制代码





上一篇:Spark源码系列(四)图解作业生命周期
下一篇:Spark源码系列(六)Shuffle的过程解析




引用:http://www.cnblogs.com/cenyuhai/p/3808774.html
作者:岑玉海




欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条