分享

BlockManager原理和源码解密【原创】

本帖最后由 breaking 于 2016-3-2 09:01 编辑
问题导读:

1.BlockManager原理是怎么样?
2.怎么在源码中解析BlockManager?



[第38课:BlockManager原理和源码解密]
BlockManager运行实例
BlockManager原理流程图
BlockManager源码解析


第一:简要概述

应用程序启动时SparkEnv会调用createExecutorEnv 方法创建ExecutorEnv并且创建Executor上的BlockManger,createDriverEnv 方法创建SparkEnv 并创建BlockManagerMaster每个节点都有一个BlockManager, 其中有一个是Driver(master), 其余的都是slaveslave上的block有任何变化的时候, 需要发送updateBlockInfo事件来更新master上block信息 ,典型的中心化设计, master和slave之间的通信通过BlockManagerMasterEndpoint来进行每个Executor中的BlockManger实例化的时候都会向Driver中的BlockManagerMaster注册,BlockMangerMaster在接收BlockManger注册的时候会为其创建BlockMangerInFo来进行元数据管理
总结BlockMnager 跟Executor是一一对应关系,由Driver上的BlockManagerMaster来管理所有的BlockManager,master 跟slave直接用netty来通信

第二:源码分析

一:BlockManager运行实例观察        
从Application启动的角度来观察BlockManager:
1,        在Application启动的时候会在SparkEnv中注册BlockManagerMaster以及MapOutputTracker,
第一步创建SparkEnv
[mw_shl_code=scala,true]private[spark] def createSparkEnv(
    conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
  SparkEnv.createDriverEnv(conf, isLocal, listenerBus,   SparkContext.numDriverCores(master))

}[/mw_shl_code]
第二步创建DriverEnv
[mw_shl_code=scala,true]private[spark] def createDriverEnv(
    conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
val hostname = conf.get("spark.driver.host")
val port = conf.get("spark.driver.port").toInt
create(
    conf,
SparkContext.DRIVER_IDENTIFIER,
hostname,
port,
isDriver = true,
isLocal = isLocal,
numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
  )

}[/mw_shl_code]
第三步调用create(SparkEnv )
[mw_shl_code=scala,true]private def create(
    conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

// Listener bus is only used on the driver
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
  }

val securityManager = new SecurityManager(conf)

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)

// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
  // In the non-driver case, the RPC env's address may be null since it may not be listening
  // for incoming connections.
if (isDriver) {
    conf.set("spark.driver.port", rpcEnv.address.port.toString)
  } else if (rpcEnv.address != null) {
    conf.set("spark.executor.port", rpcEnv.address.port.toString)
  }

// Create an instance of the class with the given name, possibly initializing it with our conf
def instantiateClass[T](className: String): T = {
val cls = Utils.classForName(className)
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
    // SparkConf, then one taking no arguments
try {
      cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
        .newInstance(conf, new java.lang.Boolean(isDriver))
        .asInstanceOf[T]
    } catch {
case _: NoSuchMethodException =>
try {
          cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
        } catch {
case _: NoSuchMethodException =>
            cls.getConstructor().newInstance().asInstanceOf[T]
        }
    }
  }

// Create an instance of the class named by the given SparkConf property, or defaultClassName
  // if the property is not set, possibly initializing it with our conf
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
    instantiateClass[T](conf.get(propertyName, defaultClassName))
  }

val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClassFromConf[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")

def registerOrLookupEndpoint(
      name: String, endpointCreator: => RpcEndpoint):
    RpcEndpointRef = {
if (isDriver) {
      logInfo("Registering " + name)
      rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
      RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
  }

val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf)
  } else {
new MapOutputTrackerWorker(conf)
  }

// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
  // requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
      rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" ->"org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" ->"org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
    } else {
UnifiedMemoryManager(conf, numUsableCores)
    }

val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)

val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
    BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

val cacheManager = new CacheManager(blockManager)

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
    // We need to wait for the task scheduler to give us an app ID.
    // Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
  } else {
// We need to set the executor ID before the MetricsSystem is created because sources and
    // sinks specified in the metrics configuration file will want to incorporate this executor's
    // ID into the metrics they report.N
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
    ms.start()
    ms
  }[/mw_shl_code]

备注:此方法中创建BlockManagerMaster,如果是创建Executor上的 BlockManager就把创建的BlockManager注册给Driver上的BlockManagerMaster
a)        BlockManagerMaster:对整个集群的 Block数据进行管理的;
b)        MapOutputTrackerMaster:跟踪所有的Mapper的输出的;
2,        BlockManagerMasterEndpoint本身是一个消息体,会负责通过远程消息通信的方式去管理所有节点的BlockManager
BlockManager 中的receiveAndReply 方法接收Executor上的的BlockManager通过 BlockManagerSlaveEndpoint 跟BlockManagerMasterEndpoint来通信(通过各种case class 不用case object 因为case object是全局共享的)
1) RegisterBlockManager : Executor 调用initialize 方法中 向Driver中的BlockManagerMaster注册当前节点的BlockManager 当BlockManagerMasterEndpoint 收到消息后会为注册的BlockManager创建BlockManagerInfo 来保存BlockManager的元信息,包括BlockManager 的id,内存大小,创建时间,slaveEndpoint(用来给BlockManager发消息的)
2) UpdateBlockInfo :  更新BlockManager的元信息 例如Driver 广播时,shuffleMapTask完成等都会更新Block信息
3) GetLocations:获取Block信息,例如ShuffledRDD 计算时会向 mapOutputTracker 要上一个Stage数据
3,        每启动一个ExecutorBackend都会实例化BlockManager并通过远程通信的方式注册给BlockManagerMaster;实质上是Executor中的BlockManager在启动的时候注册给了Driver上的BlockManagerMasterEndpoint;
4,        MemoryStore是BlockManager中专门负责内存数据存储和读写的类;
private[spark] val memoryStore = new MemoryStore(this, memoryManager) BlockManager
创建时会实例化MemoryStore ,DiskStore
5,        DiskStore是BlockManager中专门负责基于磁盘的数据存储和读写的类
private[spark] val diskStore = new DiskStore(this, diskBlockManager)
BlockManager  创建时会实例化DiskStore
6,        DiskBlockManager:管理Logical Block与Disk上的Physical Block之间的映射关系并负责磁盘的文件的创建、读写等
val diskBlockManager = new DiskBlockManager(this, conf)
BlockManager  创建时会实例化diskBlockManager
获取文件:
[mw_shl_code=scala,true]def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
      old
    } else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
      }
subDirs(dirId)(subDirId) = newDir
      newDir
    }
  }

new File(subDir, filename)

}[/mw_shl_code]

创建Shuffle block

[mw_shl_code=scala,true]/** Produces a unique block id and File suitable for storing shuffled intermediate results. */
def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
while (getFile(blockId).exists()) {
    blockId = new TempShuffleBlockId(UUID.randomUUID())
  }
  (blockId, getFile(blockId))

}[/mw_shl_code]

HashShuffleWriter 会把shuffle的中间结果写磁盘,
Write a bunch of records to this task's output
shuffle.writers(bucketId).write(elem._1, elem._2)
最终调用DiskBlockObjectWriter 将序列化的结果写到磁盘
从Job运行的角度来观察BlockManager:(以wordcount程序为例子)
1,        首先通过MemoryStore来存储广播变量
sc.textFile 方法会调用hadoopFile
[mw_shl_code=scala,true]/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
    path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
  assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)

}[/mw_shl_code]

此时会将hadoop的配置文件广播
2,        在Driver中是通过BlockManagerInfo来管理集群中每个ExecutorBackend中的BlockManager中的元数据信息的。
    BlockManagerMasterEndpoint  register 方法
blockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)}
Executor上的BlockManager注册时会创建 BlockManagerInfo 记录了BlockManagerI注册的时间,最大内存, slaveEndpoint整些元信息
3,        当改变了具体的ExecutorBackend上的Block信息后就必须发消息给Driver中的BlockManagerMaster来更新相应的BlockManagerInfo
[mw_shl_code=scala,true]override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
    Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
    logInfo(s"Running $taskName (TID $taskId)")
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = computeTotalGcTime()

try {
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
      updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.setTaskMemoryManager(taskMemoryManager)

// If this task has been killed before we deserialized it, let's quit now. Otherwise,
      // continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
        // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
        // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
        // for the task.
throw new TaskKilledException
      }

      logDebug("Task " + taskId + "'s epoch is " + task.epoch)
      env.mapOutputTracker.updateEpoch(task.epoch)

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
      val value = try {
val res = task.run(
          taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
        threwException = false
res
      } finally {
for (m <- task.metrics) {
m.setExecutorDeserializeTime(
          (taskStart - deserializeStartTime) + task.executorDeserializeTime)
// We need to subtract Task.run()'s deserialization time to avoid double-counting
m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
        m.setJvmGCTime(computeTotalGcTime() - startGCTime)
        m.setResultSerializationTime(afterSerialization - beforeSerialization)
      }

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit

// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize >0 && resultSize >maxResultSize) {
          logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)}>${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
          ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
        } else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
          env.blockManager.putBytes(
            blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
          logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
          ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
        } else {
          logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
          serializedDirectResult
        }
      }
      execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

    }}[/mw_shl_code]

env.blockManager.putBytes(blockId,serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER;BlockManager会调用reportBlockStatus 方法来更新BlockManagerInfo的信息
4,        当执行第二个Stage的时候,第二个Stage会向Driver中的MapOutputTrackerMasterEndpoint发消息请求上一个Stage中相应的输出,此时MapOutputTrackerMaster会把上一个Stage的输出数据的元数据信息发送当前请求的Stage;
例如wordcount程序:最后一个stage是ShuffledStage计算时:
[mw_shl_code=scala,true]override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
  SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
    .read()
    .asInstanceOf[Iterator[(K, C)]]

}[/mw_shl_code]

我们继续看read方法 ShuffleReader 的实现类:BlockStoreShuffleReader的read方法:
[mw_shl_code=scala,true]override def read(): Iterator[Product2[K, C]] = {
val blockFetcherItr = new ShuffleBlockFetcherIterator(
    context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),[/mw_shl_code]

继续看getMapSizesByExecutorId方法:
[mw_shl_code=scala,true]def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
    : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
  logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
// Synchronize on the returned array because, on the driver, it gets mutated in place
statuses.synchronized {
return MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
  }

}[/mw_shl_code]

这里调用了getStatuses(MapOutputTracker类) 方法:
[mw_shl_code=scala,true]val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId))

fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes)

logInfo("Got the output locations")

mapStatuses.put(shuffleId, fetchedStatuses)
MapOutputTrackerMasterEndpoint 收到
case GetMapOutputStatuses(shuffleId: Int) =>
val hostPort = context.senderAddress.hostPort
  logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
if (serializedSize >maxRpcMessageSize) {

val msg = s"Map output statuses were $serializedSize bytes which " +
s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."

/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
     * A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
val exception = new SparkException(msg)
    logError(msg, exception)
    context.sendFailure(exception)
  } else {
    context.reply(mapOutputStatuses)
  }[/mw_shl_code]

最终将上一个stage结果返回
16/02/14 20:47:15 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to Worker3:45965
16/02/14 20:47:15 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 218 bytes
16/02/14 20:47:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on Worker4:46458 (size: 22.6 KB, free: 2.7 GB)
16/02/14 20:47:15 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to Worker4:57697

图一 BlockManager执行流程图
BlockManager工作原理和运行机制简图.png

图二 BlockManager执行流程图
TFPE6A%[GU7U{LZY84F48D8.png





已有(2)人评论

跳转到指定楼层
瓦力 发表于 2016-3-1 21:59:21
spark  imf..... 加油。。。。
回复

使用道具 举报

一生一世一双人 发表于 2016-3-13 11:22:02
很赞 ! 看到瓦力的作品~继续保持
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条