分享

大数据技术Spark学习入门2

本帖最后由 levycui 于 2019-6-19 20:43 编辑
问题导读:
1、SparkContext 如何创建过程?
2、如何理解Task 执行和回馈?
3、Spark 的数据存储概念有哪些?
4、如何理解Spark Shuffle 过程?




上一篇:大数据技术Spark学习入门1

第8章 SparkContext 解析
8.1 SparkContext 解析

SparkContext 是用户通往 Spark 集群的唯一入口,任何需要使用 Spark 的地方都需要先创建 SparkContext,那么 SparkContext 做了什么?
首先 SparkContext 是在 Driver 程序里面启动的,可以看做 Driver 程序和 Spark 集群的一个连接,SparkContext 在初始化的时候,创建了很多对象,如下图所示:
EtPcJx.png
上图列出了 SparkContext 在初始化创建的时候的一些主要组件的构建。
8.2 SparkContext 创建过程
EtPhOe.png
详解如下:

SparkContext 在新建时:
1)内部创建一个 SparkEnv,SparkEnv 内部创建一个 RpcEnv。
    a) RpcEnv 内部创建并注册一个 MapOutputTrackerMasterEndpoint(该 Endpoint 暂不介绍)
2)接着创建 DAGScheduler、TaskSchedulerImpl、SchedulerBackend。
    a) TaskSchedulerImpl 创建时创建 SchedulableBuilder,SchedulableBuilder 根据类型分为 FIFOSchedulableBuilder、FairSchedulableBuilder 两类
3)最后启动 TaskSchedulerImpl,TaskSchedulerImpl 启动 SchedulerBackend。
    a) SchedulerBackend 启动时创建 ApplicationDescription、DriverEndpoint、StandloneAppClient
    b) StandloneAppClient 内部包括一个 ClientEndpoint

8.3 SparkContext 简易结构与交互关系

EtPgW6.png
详解如下:

1)SparkContext:是用户 Spark 执行任务的上下文,用户程序内部使用 Spark 提供的 Api 直接或间接创建一个 SparkContext。
2)SparkEnv:用户执行的环境信息,包括通信相关的端点。
3)RpcEnv:SparkContext 中远程通信环境。
4)ApplicationDescription:应用程序描述信息,主要包含 appName、maxCores、memoryPerExecutorMB、coresPerExecutor、Command (CoarseGrainedExecutorBackend)、appUiUrl 等。
5)ClientEndpoint:客户端端点,启动后向 Master 发起注册 RegisterApplication 请求。
6)Master:接受 RegisterApplication 请求后,进行 Worker 资源分配,并向分配的资源发起 LaunchExecutor 指令。
7)Worker:接受 LaunchExecutor 指令后,运行 ExecutorRunner。
8)ExecutorRunner:运行 applicationDescription 的 Command 命令,最终 Executor,同时向 DriverEndpoint 注册 Executor 信息。

8.4 Master 对 Application 资源分配

当 Master 接受 Driver 的 RegisterApplication 请求后,放入 waitingDrivers 队列中,在同一调度中进行资源分配,分配过程如下:
EtPRSK.png
详解如下:

waitingApps 与 aliveWorkers 进行资源匹配:
1)如果 waitingApp 配置了 app.desc.coresPerExecutor:
    a) 轮询所有有效可分配的 worker,每次分配一个 executor,executor 的核数为 minCoresPerExecutor(app.desc.coresPerExecutor),直到不存在有效可分配资源或者 app 依赖的资源已全部被分配。
2)如果 waitingApp 没有配置 app.desc.coresPerExecutor:
    a) 轮询所有有效可分配的 worker,每个 worker 分配一个 executor,executor 的核数为从 minCoresPerExecutor(为固定值1) 开始递增,直到不存在有效可分配资源或者 app 依赖的资源已全部被分配。
3)其中有效可分配 worker 定义为满足一次资源分配的 worker:
    a) cores 满足:usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
    b) memory 满足(如果是新的 Executor):usableWorkers(pos).memoryFree - assignedExecutors(pos) * memoryPerExecutor >= memoryPerExecutor
注意:Master 针对于 applicationInfo 进行资源分配时,只有存在有效可用的资源就直接分配,而分配剩余的 app.coresLeft 则等下一次再进行分配。

8.5 Worker 创建 Executor

EtP7FI.png
(图解:橙色组件是 Endpoint 组件)

详解如下:

Worker 启动 Executor
1)在 Worker 的 tempDir 下面创建 application 以及 executor 的目录,并 chmod 700 操作权限。
2)创建并启动 ExecutorRunner 进行 Executor 的创建。
3)向 Master 发送 Executor 的状态情况。

ExecutorRnner
1)新线程【ExecutorRunner for [executorId]】读取 ApplicationDescription 将其中 Command 转化为本地的 Command 命令。
2)调用 Command 并将日志输出至 executor 目录下的 stdout 和 stderr 日志文件中,Command 对应的 java 类为 CoarseGrainedExecutorBackend。

CoarseGrainedExecutorBackend
1)创建一个 SparkEnv,创建 ExecutorEndpoint(CoarseGrainedExecutorBackend)以及 WorkerWatcher。
2)ExecutorEndpoint 创建并启动后,向 DriverEndpoint 发送 RegisterExecutor 请求并等待返回。
3)DriverEndpoint 处理 RegisterExecutor 请求,返回 ExecutorEndpointRegister 的结果。
4)如果注册成功,ExecutorEndpoint 内部再创建 Executor 的处理对象。

至此,Spark 运行任务的容器框架就搭建完成。


第9章 Job 提交和 Task 的拆分

在前面的章节 Client 的加载中,Spark 的 DriverRunner 已开始执行用户任务类(比如:org.apache.spark.examples.SparkPi),下面我们开始针对于用户任务类(或者任务代码)进行分析:
9.1 整体预览
EtPWQO.png
详解如下:

1)Code:指的用户编写的代码
2)RDD:弹性分布式数据集,用户编码根据 SparkContext 与 RDD 的 api 能够很好的将 Code 转化为 RDD 数据结构(下文将做转化细节介绍)。
3)DAGScheduler:有向无环图调度器,将 RDD 封装为 JobSubmitted 对象存入 EventLoop (实现类DAGSchedulerEventProcessLoop) 队列中。
4)EventLoop: 定时扫描未处理 JobSubmitted 对象,将 JobSubmitted 对象提交给 DAGScheduler。
5)DAGScheduler:针对于 JobSubmitted 进行处理,最终将 RDD 转化为执行 TaskSet,并将 TaskSet 提交至 TaskScheduler。
6)TaskScheduler: 根据 TaskSet 创建 TaskSetManager 对象存入 SchedulableBuilder 的数据池(Pool)中,并调用 DriverEndpoint 唤起消费(ReviveOffers)操作。
7)DriverEndpoint:接受 ReviveOffers 指令后将 TaskSet 中的 Tasks 根据相关规则均匀分配给Executor。
8)Executor:启动一个 TaskRunner 执行一个 Task。

9.2 Code 转化为初始 RDDs

我们的用户代码通过调用 Spark 的 Api(比如:SparkSession.builder.appName("Spark Pi").getOrCreate()),该 Api 会创建 Spark 的上下文(SparkContext),当我们调用 transform 类方法(如:parallelize(),map())都会创建(或者装饰已有的)Spark 数据结构(RDD),如果是 action 类操作(如:reduce()),那么将最后封装的 RDD 作为一次 Job 提交,存入待调度队列中(DAGSchedulerEventProcessLoop )待后续异步处理。
如果多次调用 action 类操作,那么封装的多个 RDD 作为多个 Job 提交。
流程如下:
EtPfyD.png
详解如下:

ExecuteEnv(执行环境)
1)这里可以是通过 spark-submit 提交的 MainClass,也可以是 spark-shell 脚本。
2)MainClass:代码中必定会创建或者获取一个 SparkContext。
3)spark-shell:默认会创建一个 SparkContext。

RDD(弹性分布式数据集)

1)create:可以直接创建(如:sc.parallelize(1 until n, slices) ),也可以在其他地方读取(如:sc.textFile("README.md"))等。
2)transformation:rdd 提供了一组 api 可以进行对已有 RDD 进行反复封装成为新的 RDD,这里采用的是`装饰者设计模式`,下面为部分装饰器类图。
3)action:当调用 RDD 的 action 类操作方法时(collect、reduce、lookup、save ),这触发 DAGScheduler 的 Job 提交。
4)DAGScheduler:创建一个名为 JobSubmitted 的消息至 DAGSchedulerEventProcessLoop 阻塞消息队列(LinkedBlockingDeque)中。
5)DAGSchedulerEventProcessLoop:启动名为【dag-scheduler-event-loop】的线程实时消费消息队列。
6)【dag-scheduler-event-loop】处理完成后回调 JobWaiter。
7)DAGScheduler:打印 Job 执行结果。
8)JobSubmitted:相关代码如下(其中 jobId 为 DAGScheduler 全局递增 Id)。
    eventProcessLoop.post(JobSubmitted(
            jobId, rdd, func2, partitions.toArray, callSite, waiter,
            SerializationUtils.clone(properties)))

部分装饰器类图
EtP5eH.png
最终示例:
EtPoTA.png
最终转化的 RDD 分为四层,每层都依赖于上层 RDD,将 ShffleRDD 封装为一个 Job 存入 DAGSchedulerEventProcessLoop 待处理,如果我们的代码中存在几段上面示例代码,那么就会创建对应对的几个 ShffleRDD 分别存入 DAGSchedulerEventProcessLoop 中。

9.3 RDD 分解为待执行任务集合(TaskSet)

Job 提交后,DAGScheduler 根据 RDD 层次关系解析为对应的 Stages,同时维护 Job 与 Stage 的关系。
将最上层的 Stage 根据并发关系(findMissingPartitions)分解为多个 Task,将这个多个 Task 封装为 TaskSet 提交给 TaskScheduler。非最上层的 Stage 的存入处理的列表中(waitingStages += stage)
流程如下:
EtPIwd.png
详解如下:

1)DAGSchedulerEventProcessLoop中,线程【dag-scheduler-event-loop】处理到 JobSubmitted
2)调用 DAGScheduler 进行 handleJobSubmitted
    a) 首先根据 RDD 依赖关系依次创建 Stage 族,Stage 分为 ShuffleMapStage、ResultStage 两类,如下图所示:
    b) 更新 jobId 与 StageId 关系 Map
    c) 创建 ActiveJob,调用 LiveListenerBug,发送 SparkListenerJobStart 指令
    d) 找到最上层 Stage 进行提交,下层 Stage 存入 waitingStage 中待后续处理
        1) 调用 OutputCommitCoordinator 进行 stageStart() 处理
        2) 调用 LiveListenerBug,发送 SparkListenerStageSubmitted 指令
        3) 调用 SparkContext的broadcast 方法获取 Broadcast 对象,根据 Stage 类型创建对应多个 Task,一个 Stage 根据 findMissingPartitions 分为多个对应的 Task,Task 分为 ShuffleMapTask、ResultTask
        4) 将 Task 封装为 TaskSet,调用 TaskScheduler.submitTasks(taskSet) 进行 Task 调度,关键代码如下:
            taskScheduler.submitTasks(new TaskSet(
                tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

ShuffleMapStage、ResultStage 两类
ENpVtx.png

9.4 TaskSet 封装为 TaskSetManager 并提交至 Driver

TaskScheduler 将 TaskSet 封装为 TaskSetManager(new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)),存入待处理任务池(Pool)中,发送 DriverEndpoint 唤起消费(ReviveOffers)指令。
EtPbfP.png
详解如下:

1)DAGSheduler 将 TaskSet 提交给 TaskScheduler 的实现类,这里是 TaskChedulerImpl。
2)TaskSchedulerImpl 创建一个 TaskSetManager 管理 TaskSet,关键代码如下:
    new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
3)同时将 TaskSetManager 添加 SchedduableBuilder 的任务池 Poll 中。
4)调用 SchedulerBackend 的实现类进行 reviveOffers,这里是 standlone 模式的实现类 StandaloneSchedulerBackend。
5)SchedulerBackend 发送 ReviveOffers 指令至 DriverEndpoint。

9.5 Driver 将 TaskSetManager 分解为 TaskDescriptions 并发布任务到 Executor

Driver 接受唤起消费指令后,将所有待处理的 TaskSetManager 与 Driver 中注册的 Executor 资源进行匹配,最终一个 TaskSetManager 得到多个 TaskDescription 对象,按照 TaskDescription 相对应的 Executor 发送 LaunchTask 指令。
EtPxmQ.png
详解如下:

当 Driver 获取到 ReviveOffers(请求消费)指令时
1)首先根据 executorDataMap 缓存信息得到可用的 Executor 资源信息(WorkerOffer),关键代码如下:
[mw_shl_code=scala,true]    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq[/mw_shl_code]

2)接着调用 TaskScheduler 进行资源匹配,方法定义如下:
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {..}
    a) 将 WorkerOffer 资源打乱,如:val shuffledOffers = Random.shuffle(offers)
    b) 将 Pool 中待处理的 TaskSetManager 取出,如:val sortedTaskSets = rootPool.getSortedTaskSetQueue
    c) 并循环处理 sortedTaskSets 并与 shuffledOffers 循环匹配,如果 shuffledOffers(i) 有足够的 CPU 资源( if (availableCpus(i) >= CPUS_PER_TASK)),调用 TaskSetManager 创建 TaskDescription 对象(taskSet.resourceOffer(execId, host, maxLocality)),最终创建了多个 TaskDescription,TaskDescription 定义如下:
[mw_shl_code=scala,true]        new TaskDescription(
            taskId,
            attemptNum,
            execId,
            taskName,
            index,
            sched.sc.addedFiles,
            sched.sc.addedJars,
            task.localProperties,
            serializedTask)[/mw_shl_code]

3)如果 TaskDescriptions 不为空,循环 TaskDescriptions,序列化 TaskDescription 对象,并向 ExecutorEndpoint 发送 LaunchTask 指令,关键代码如下:
[mw_shl_code=scala,true]    for (task <- taskDescriptions.flatten) {
            val serializedTask = TaskDescription.encode(task)
            val executorData = executorDataMap(task.executorId)
            executorData.freeCores -= scheduler.CPUS_PER_TASK
            executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }[/mw_shl_code]


第10章 Task 执行和回馈

DriverEndpoint 最终生成多个可执行的 TaskDescription 对象,并向各个 ExecutorEndpoint 发送 LaunchTask 指令,本节内容将关注 ExecutorEndpoint 如何处理 LaunchTask 指令,处理完成后如何回馈给 DriverEndpoint,以及整个 job 最终如何多次调度直至结束。
10.1 Task 的执行流程

Executor 接受 LaunchTask 指令后,开启一个新线程 TaskRunner 解析 RDD,并调用 RDD 的 compute 方法,归并函数得到最终任务执行结果。
EtPX6S.png
详解如下:

1)ExecutorEndpoint 接受到 LaunchTask 指令后,解码出 TaskDescription,调用 Executor 的 launchTask 方法。
2)Executor 创建一个 TaskRunner 线程,并启动线程,同时将改线程添加到 Executor 的成员对象中,代码如下:
    private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
    runningTasks.put(taskDescription.taskId, taskRunner)

TaskRunner
1)首先向 DriverEndpoint 发送任务最新状态为 RUNNING。
2)从 TaskDescription 解析出 Task,并调用 Task 的 run 方法。

Task
1)创建 TaskContext 以及 CallerContext (与 HDFS 交互的上下文对象)。
2)执行 Task 的 runTask 方法:
    a) 如果 Task 实例为 ShuffleMapTask:解析出 RDD 以及 ShuffleDependency 信息,调用 RDD 的 compute() 方法将结果写 Writer 中(Writer 这里不介绍,可以作为黑盒理解,比如写入一个文件中),返回 MapStatus 对象。
    b) 如果 Task 实例为 ResultTask:解析出 RDD 以及合并函数信息,调用函数将调用后的结果返回。

TaskRunner 将 Task 执行的结果序列化,再次向 DriverEndpoint 发送任务最新状态为 FINISHED。

10.2 Task 的回馈流程

TaskRunner 执行结束后,都将执行状态发送至 DriverEndpoint,DriverEndpoint 最终反馈指令 CompletionEvent 发送至 DAGSchedulerEventProcessLoop 中。
EtPLSf.png
详解如下:

1)DriverEndpoint 接收到 StatusUpdate 消息后,调用 TaskScheduler 的 statusUpdate(taskId, state, result) 方法
2)TaskScheduler 如果任务结果是完成,那么清除该任务处理中的状态,并调动 TaskResultGetter 相关方法,关键代码如下:
[mw_shl_code=scala,true]    val taskSet = taskIdToTaskSetManager.get(tid)

    taskIdToTaskSetManager.remove(tid)
            taskIdToExecutorId.remove(tid).foreach { executorId =>
        executorIdToRunningTaskIds.get(executorId).foreach { _.remove(tid) }
    }
    taskSet.removeRunningTask(tid)
    if (state == TaskState.FINISHED) {
        taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
    } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
        taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
    }[/mw_shl_code]

TaskResultGetter 启动线程启动线程【task-result-getter】进行相关处理:
1)通过解析或者远程获取得到 Task 的 TaskResult 对象。
2)调用 TaskSet 的 handleSuccessfulTask 方法,TaskSet 的 handleSuccessfulTask 方法直接调用 TaskSetManager 的 handleSuccessfulTask 方法。

TaskSetManager
1)更新内部 TaskInfo 对象状态,并将该 Task 从运行中 Task 的集合删除,代码如下:
    val info = taskInfos(tid)
    info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
    removeRunningTask(tid)
2)调用 DAGScheduler 的 taskEnded 方法,关键代码如下:
    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)

DAGScheduler 向 DAGSchedulerEventProcessLoop 存入 CompletionEvent 指令,CompletionEvent 对象定义如下:
[mw_shl_code=scala,true]    private[scheduler] case class CompletionEvent(
        task: Task[_],
        reason: TaskEndReason,
        result: Any,
        accumUpdates: Seq[AccumulatorV2[_, _]],
        taskInfo: TaskInfo) extends DAGSchedulerEvent[/mw_shl_code]

10.3 Task 的迭代流程

DAGSchedulerEventProcessLoop 中针对于 CompletionEvent 指令,调用 DAGScheduler 进行处理,DAGScheduler 更新 Stage 与该 Task 的关系状态,如果 Stage 下 Task 都返回,则做下一层 Stage 的任务拆解与运算工作,直至 Job 被执行完毕:
EtPzwj.png
详解如下:

1)DAGSchedulerEventProcessLoop 接收到 CompletionEvent 指令后,调用 DAGScheduler 的 handleTaskCompletion 方法。
2)DAGScheduler 根据 Task 的类型分别处理。
3)如果 Task 为 ShuffleMapTask
    a) 等待回馈的 Partitions 减去当前 partitionId
    b) 如果所有 task 都返回,则 markStageAsFinished(shuffleStage),同时向 MapOutputTrackerMaster 注册 MapOutputs 信息,且 markMapStageJobAsFinished
    c) 调用 submitWaitingChildStages(shuffleStage) 进行下层 Stages 的处理,从而迭代处理,最终处理到 ResultTask,job 结束,关键代码如下:
[mw_shl_code=scala,true]        private def submitWaitingChildStages(parent: Stage) {
            ...
            val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
            waitingStages --= childStages
            for (stage <- childStages.sortBy(_.firstJobId)) {
                submitStage(stage)
            }
        }[/mw_shl_code]
4)如果 Task 为 ResultTask
    a) 该 job 的 partitions 都已返回,则 markStageAsFinished(resultStage),并 cleanupStateForJobAndIndependentStages(job),关键代码如下:
[mw_shl_code=scala,true]       for (stage <- stageIdToStage.get(stageId)) {
            if (runningStages.contains(stage)) {
                logDebug("Removing running stage %d".format(stageId))
                runningStages -= stage
            }
            for ((k, v) <- shuffleIdToMapStage.find(_._2 == stage)) {
                shuffleIdToMapStage.remove(k)
            }
            if (waitingStages.contains(stage)) {
                logDebug("Removing stage %d from waiting set.".format(stageId))
                waitingStages -= stage
            }
            if (failedStages.contains(stage)) {
                logDebug("Removing stage %d from failed set.".format(stageId))
                failedStages -= stage
            }
        }
        // data structures based on StageId
        stageIdToStage -= stageId
        jobIdToStageIds -= job.jobId
        jobIdToActiveJob -= job.jobId
        activeJobs -= job[/mw_shl_code]

至此,用户编写的代码最终调用 Spark 分布式计算完毕。

10.4 精彩图解

Spark的交互流程 – 节点启动
EtPOl8.png
Spark的交互流程 – 应用提交
EtPjOg.png
Spark的交互流程 – 任务运行
EtjLi8.png
Spark的交互流程 – 任务运行
Etj7Zt.png

第11章 Spark 的数据存储

Spark 计算速度远胜于 Hadoop 的原因之一就在于中间结果是缓存在内存而不是直接写入到 disk,本文尝试分析 Spark 中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

11.1 存储子系统概览

Storage 模块主要分为两层:
1) 通信层:storage 模块采用的是 master-slave 结构来实现通信层,master 和 slave 之间传输控制信息、状态信息,这些都是通过通信层来实现的。
2) 存储层:storage 模块需要把数据存储到 disk 或是 memory 上面,有可能还需 replicate(复制) 到远端,这都是由存储层来实现和提供相应接口。
而其他模块若要和 storage 模块进行交互,storage 模块提供了统一的操作类 BlockManager,外部类与 storage 模块打交道都需要通过调用 BlockManager 相应接口来实现。
EtjbIf.png
上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下:

1)CacheManager         RDD 在进行计算的时候,通过 CacheManager 来获取数据,并通过 CacheManager 来存储计算结果。
2)BlockManager         CacheManager 在进行数据读取和存取的时候主要是依赖 BlockManager 接口来操作,BlockManager 决定数据是从内存(MemoryStore) 还是从磁盘(DiskStore) 中获取。
3)MemoryStore          负责将数据保存在内存或从内存读取。
4)DiskStore            负责将数据写入磁盘或从磁盘读入。
5)BlockManagerWorker   数据写入本地的 MemoryStore 或 DiskStore 是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由 BlockManagerWorker 来处理这一部分事情。
6)ConnectionManager    负责与其它计算结点建立连接,并负责数据的发送和接收。
7)BlockManagerMaster   注意该模块只运行在 Driver Application 所在的 Executor,功能是负责记录下所有 BlockIds 存储在哪个 SlaveWorker 上,比如 RDD Task 运行在机器 A,所需要的 BlockId 为 3,但在机器 A 上没有 BlockId 为 3 的数值,这个时候 Slave worker 需要通过 BlockManager 向 BlockManagerMaster 询问数据存储的位置,然后再通过 ConnectionManager 去获取。

11.2 启动过程分析

上述的各个模块由 SparkEnv 来创建,创建过程在 SparkEnv.create 中完成,代码如下:

[mw_shl_code=scala,true]val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
        "BlockManagerMaster",
        new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)

val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)[/mw_shl_code]

下面这段代码容易让人疑惑,看起来像是在所有的 cluster node 上都创建了 BlockManagerMasterActor,其实不然,仔细看 registerOrLookup 函数的实现。如果当前节点是 driver 则创建这个 actor,否则建立到 driver 的连接。代码如下:

[mw_shl_code=scala,true]def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
    if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
    } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        logInfo(s"Connecting to $name: $url")
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
    }
}[/mw_shl_code]

初始化过程中一个主要的动作就是 BlockManager 需要向 BlockManagerMaster 发起注册。

11.3 通信层
EtjXRg.png
BlockManager 包装了 BlockManagerMaster,发送信息包装成 BlockManagerInfo。Spark 在 Driver 和 Worker 端都创建各自的 BlockManager,并通过 BlockManagerMaster 进行通信,通过 BlockManager 对 Storage 模块进行操作。
BlockManager 对象在 SparkEnv.create 函数中进行创建,代码如下:

[mw_shl_code=scala,true]def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
    if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
    } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
    }
}
......
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
        new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
        conf, isDriver)

// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
        serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,numUsableCores)[/mw_shl_code]

并且在创建之前对当前节点是否是 Driver 进行了判断。如果是,则创建这个 Endpoint;否则,创建 Driver 的连接。

在创建 BlockManager 之后,BlockManager 会调用 initialize 方法初始化自己。并且初始化的时候,会调用 BlockManagerMaster 向 Driver 注册自己,同时,在注册时也启动了Slave Endpoint。另外,向本地 shuffle 服务器注册 Executor 配置,如果存在的话。代码如下:

[mw_shl_code=scala,true]def initialize(appId: String): Unit = {
......
    master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)

    // Register Executors' configuration with the local shuffle service, if one should exist.
    if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
        registerWithExternalShuffleServer()
    }
}[/mw_shl_code]

而 BlockManagerMaster 将注册请求包装成 RegisterBlockManager 注册到 Driver。Driver 的 BlockManagerMasterEndpoint 会调用 register 方法,通过对消息 BlockManagerInfo 检查,向 Driver 注册,代码如下:

[mw_shl_code=scala,true]private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    val time = System.currentTimeMillis()
    if (!blockManagerInfo.contains(id)) {
        blockManagerIdByExecutor.get(id.executorId) match {
            case Some(oldId) =>
                // A block manager of the same executor already exists, so remove it (assumed dead)
                logError("Got two different block manager registrations on same executor - "
                        + s" will replace old one $oldId with new one $id")
                removeExecutor(id.executorId)
            case None =>
        }
        logInfo("Registering block manager %s with %s RAM, %s".format(
                id.hostPort, Utils.bytesToString(maxMemSize), id))

        blockManagerIdByExecutor(id.executorId) = id

        blockManagerInfo(id) = new BlockManagerInfo(
                id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}[/mw_shl_code]

不难发现 BlockManagerInfo 对象被保存到 Map 映射中。在通信层中 BlockManagerMaster 控制着消息的流向,这里采用了模式匹配,所有的消息模式都在 BlockManagerMessage 中。

11.4 存储层
EtjoqI.png
Spark Storage 的最小存储单位是 block,所有的操作都是以 block 为单位进行的。
在 BlockManager 被创建的时候 MemoryStore 和 DiskStore 对象就被创建出来了。代码如下:

[mw_shl_code=scala,true]val diskBlockManager = new DiskBlockManager(this, conf)
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)[/mw_shl_code]

11.4.1 Disk Store

由于当前的 Spark 版本对 Disk Store 进行了更细粒度的分工,把对文件的操作提取出来放到了 DiskBlockManager 中,DiskStore 仅仅负责数据的存储和读取。
Disk Store 会配置多个文件目录,Spark 会在不同的文件目录下创建文件夹,其中文件夹的命名方式是:spark-UUID(随机UUID码)。Disk Store 在存储的时候创建文件夹。并且根据【高内聚,低耦合】原则,这种服务型的工具代码就放到了 Utils 中(调用路径:DiskStore.putBytes —> DiskBlockManager.createLocalDirs —> Utils.createDirectory),代码如下:

[mw_shl_code=scala,true]def createDirectory(root: String, namePrefix: String = "spark"): File = {
    var attempts = 0
    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
    var dir: File = null
    while (dir == null) {
        attempts += 1
        if (attempts > maxAttempts) {
            throw new IOException("Failed to create a temp directory (under " + root + ") after " +
                    maxAttempts + " attempts!")
        }
        try {
            dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
            if (dir.exists() || !dir.mkdirs()) {
                dir = null
            }
        } catch { case e: SecurityException => dir = null; }
    }

    dir.getCanonicalFile
}[/mw_shl_code]

在 DiskBlockManager 里,每个 block 都被存储为一个 file,通过计算 blockId 的 hash 值,将 block 映射到文件中。

[mw_shl_code=scala,true]def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    val subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
            old
        } else {
            val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
            if (!newDir.exists() && !newDir.mkdir()) {
                throw new IOException(s"Failed to create local dir in $newDir.")
            }
            subDirs(dirId)(subDirId) = newDir
            newDir
        }
    }

    new File(subDir, filename)
}

def getFile(blockId: BlockId): File = getFile(blockId.name)[/mw_shl_code]

通过 hash 值的取模运算,求出 dirId 和 subDirId。然后,在从 subDirs 中找到 subDir,如果 subDir 不存在,则创建一个新 subDir。最后,以 subDir 为路径,blockId 的 name 属性为文件名,新建该文件。
文件创建完之后,那么 Spark 就会在 DiskStore 中向文件写与之映射的 block,代码如下:

[mw_shl_code=scala,true]override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {
    val bytes = _bytes.duplicate()
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val channel = new FileOutputStream(file).getChannel
    Utils.tryWithSafeFinally {
        while (bytes.remaining > 0) {
            channel.write(bytes)
        }
    } {
        channel.close()
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
            file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
    PutResult(bytes.limit(), Right(bytes.duplicate()))
}[/mw_shl_code]

读取过程就简单了,DiskStore 根据 blockId 读取与之映射的 file 内容,当然,这中间需要从 DiskBlockManager 中得到文件信息。代码如下:

[mw_shl_code=scala,true]private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
    val channel = new RandomAccessFile(file, "r").getChannel
    Utils.tryWithSafeFinally {
        // For small files, directly read rather than memory map
        if (length < minMemoryMapBytes) {
            val buf = ByteBuffer.allocate(length.toInt)
            channel.position(offset)
            while (buf.remaining() != 0) {
                if (channel.read(buf) == -1) {
                    throw new IOException("Reached EOF before filling buffer\n" +
                            s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
                }
            }
            buf.flip()
            Some(buf)
        } else {
            Some(channel.map(MapMode.READ_ONLY, offset, length))
        }
    } {
        channel.close()
    }
}

override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
    val file = diskManager.getFile(blockId.name)
    getBytes(file, 0, file.length)
}[/mw_shl_code]

11.4.2 Memory Store

相对 Disk Store,Memory Store 就显得容易很多。Memory Store 用一个 LinkedHashMap 来管理,其中 Key 是 blockId,Value 是 MemoryEntry 样例类,MemoryEntry 存储着数据信息。代码如下:

[mw_shl_code=scala,true]private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)[/mw_shl_code]

在 MemoryStore 中存储 block 的前提是当前内存有足够的空间存放。通过对 tryToPut 函数的调用对内存空间进行判断。代码如下:

[mw_shl_code=scala,true]def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
    // Work on a duplicate - since the original input might be used elsewhere.
    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
    val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
    val data =
    if (putAttempt.success) {
        assert(bytes.limit == size)
        Right(bytes.duplicate())
    } else {
        null
    }
    PutResult(size, data, putAttempt.droppedBlocks)
}[/mw_shl_code]

在 tryToPut 函数中,通过调用 enoughFreeSpace 函数判断内存空间。如果内存空间足够,那么就把 block 放到 LinkedHashMap 中;如果内存不足,那么就告诉 BlockManager 内存不足,如果允许 Disk Store,那么就把该 block 放到 disk 上。代码如下:

[mw_shl_code=scala,true]private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {
    var putSuccess = false
    val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

    accountingLock.synchronized {
        val freeSpaceResult = ensureFreeSpace(blockId, size)
        val enoughFreeSpace = freeSpaceResult.success
        droppedBlocks ++= freeSpaceResult.droppedBlocks

        if (enoughFreeSpace) {
            val entry = new MemoryEntry(value(), size, deserialized)
            entries.synchronized {
                entries.put(blockId, entry)
                currentMemory += size
            }
            val valuesOrBytes = if (deserialized) "values" else "bytes"
            logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
                    blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
            putSuccess = true
        } else {
            lazy val data = if (deserialized) {
                Left(value().asInstanceOf[Array[Any]])
            } else {
                Right(value().asInstanceOf[ByteBuffer].duplicate())
            }
            val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
            droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
        }
        releasePendingUnrollMemoryForThisTask()
    }
    ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}[/mw_shl_code]

Memory Store 读取 block 也很简单,只需要从 LinkedHashMap 中取出 blockId 的 Value 即可。代码如下:

[mw_shl_code=scala,true]override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
    val entry = entries.synchronized {
        entries.get(blockId)
    }
    if (entry == null) {
        None
    } else if (entry.deserialized) {
        Some(entry.value.asInstanceOf[Array[Any]].iterator)
    } else {
        val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
        Some(blockManager.dataDeserialize(blockId, buffer))
    }
}[/mw_shl_code]

11.5 数据写入过程分析
ENp9cF.png
数据写入的简要流程:

1)RDD.iterator 是与 storage 子系统交互的入口。
2)CacheManager.getOrCompute 调用 BlockManager 的 put 接口来写入数据。
3)数据优先写入到 MemoryStore 即内存,如果 MemoryStore 中的数据已满则将最近使用次数不频繁的数据写入到磁盘。
4)通知 BlockManagerMaster 有新的数据写入,在 BlockManagerMaster 中保存元数据。
5)将写入的数据与其它 slave worker 进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即 replicanumber=1。
其实,我们在 put 和 get block 的时候并没有那么复杂,前面的细节 BlockManager 都包装好了,我们只需要调用 BlockManager 中的 put 和 get 函数即可。

代码如下:

[mw_shl_code=scala,true]def putBytes(
           blockId: BlockId,
           bytes: ByteBuffer,
           level: StorageLevel,
           tellMaster: Boolean = true,
           effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
       require(bytes != null, "Bytes is null")
       doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
   }
   private def doPut(
           blockId: BlockId,
           data: BlockValues,
           level: StorageLevel,
           tellMaster: Boolean = true,
           effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {

       require(blockId != null, "BlockId is null")
       require(level != null && level.isValid, "StorageLevel is null or invalid")
       effectiveStorageLevel.foreach { level =>
           require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
       }

       val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

       val putBlockInfo = {
               val tinfo = new BlockInfo(level, tellMaster)
               val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
       if (oldBlockOpt.isDefined) {
           if (oldBlockOpt.get.waitForReady()) {
               logWarning(s"Block $blockId already exists on this machine; not re-adding it")
               return updatedBlocks
           }
           oldBlockOpt.get
       } else {
           tinfo
       }
}

       val startTimeMs = System.currentTimeMillis

       var valuesAfterPut: Iterator[Any] = null

       var bytesAfterPut: ByteBuffer = null

       var size = 0L

       val putLevel = effectiveStorageLevel.getOrElse(level)

       val replicationFuture = data match {
           case b: ByteBufferValues if putLevel.replication > 1 =>
               // Duplicate doesn't copy the bytes, but just creates a wrapper
               val bufferView = b.buffer.duplicate()
               Future {
               replicate(blockId, bufferView, putLevel)
           }(futureExecutionContext)
           case _ => null
       }

       putBlockInfo.synchronized {
           logTrace("Put for block %s took %s to get into synchronized block"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))

           var marked = false
           try {
               val (returnValues, blockStore: BlockStore) = {
                   if (putLevel.useMemory) {
                       (true, memoryStore)
                   } else if (putLevel.useOffHeap) {
                       (false, externalBlockStore)
                   } else if (putLevel.useDisk) {
                       (putLevel.replication > 1, diskStore)
                   } else {
                       assert(putLevel == StorageLevel.NONE)
                       throw new BlockException(
                               blockId, s"Attempted to put block $blockId without specifying storage level!")
                   }
               }

               val result = data match {
                   case IteratorValues(iterator) =>
                       blockStore.putIterator(blockId, iterator, putLevel, returnValues)
                   case ArrayValues(array) =>
                       blockStore.putArray(blockId, array, putLevel, returnValues)
                   case ByteBufferValues(bytes) =>
                       bytes.rewind()
                       blockStore.putBytes(blockId, bytes, putLevel)
               }
               size = result.size
               result.data match {
                   case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
                   case Right (newBytes) => bytesAfterPut = newBytes
                   case _ =>
               }

               if (putLevel.useMemory) {
                   result.droppedBlocks.foreach { updatedBlocks += _ }
               }

               val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
               if (putBlockStatus.storageLevel != StorageLevel.NONE) {
                   marked = true
                   putBlockInfo.markReady(size)
                   if (tellMaster) {
                       reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
                   }
                   updatedBlocks += ((blockId, putBlockStatus))
               }
           } finally {
               if (!marked) {
                   blockInfo.remove(blockId)
                   putBlockInfo.markFailure()
                   logWarning(s"Putting block $blockId failed")
               }
           }
       }
       logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))

       if (putLevel.replication > 1) {
           data match {
               case ByteBufferValues(bytes) =>
                   if (replicationFuture != null) {
                       Await.ready(replicationFuture, Duration.Inf)
                   }
               case _ =>
                   val remoteStartTime = System.currentTimeMillis
                   if (bytesAfterPut == null) {
                       if (valuesAfterPut == null) {
                           throw new SparkException(
                                   "Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
                       }
                       bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
                   }
                   replicate(blockId, bytesAfterPut, putLevel)
                   logDebug("Put block %s remotely took %s"
                           .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
           }
       }

       BlockManager.dispose(bytesAfterPut)

       if (putLevel.replication > 1) {
           logDebug("Putting block %s with replication took %s"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
       } else {
           logDebug("Putting block %s without replication took %s"
                   .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
       }

       updatedBlocks
   }[/mw_shl_code]

对于 doPut 函数,主要做了以下几个操作:
1)创建 BlockInfo 对象存储 block 信息。
2)将 BlockInfo 加锁,然后根据 Storage Level 判断存储到 Memory 还是 Disk。同时,对于已经准备好读的 BlockInfo 要进行解锁。
3)根据 block 的副本数量决定是否向远程发送副本。

11.5.1 序列化与否

写入的具体内容可以是序列化之后的 bytes 也可以是没有序列化的 value. 此处有一个对 scala 的语法中 Either, Left, Right 关键字的理解。

11.6 数据读取过程分析

[mw_shl_code=scala,true]def get(blockId: BlockId): Option[Iterator[Any]] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
        logInfo("Found block %s locally".format(blockId))
        return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
        logInfo("Found block %s remotely".format(blockId))
        return remote
    }
    None
}[/mw_shl_code]

11.6.1 本地读取

首先在查询本机的 MemoryStore 和 DiskStore 中是否有所需要的 block 数据存在,如果没有则发起远程数据获取。

11.6.2 远程读取

远程获取调用路径, getRemote --> doGetRemote, 在 doGetRemote 中最主要的就是调用 BlockManagerWorker.syncGetBlock 来从远程获得数据。

[mw_shl_code=scala,true]def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
            toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
        case Some(message) => {
            val bufferMessage = message.asInstanceOf[BufferMessage]
            logDebug("Response message received " + bufferMessage)
            BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
                    logDebug("Found " + blockMessage)
            return blockMessage.getData
      })
        }
        case None => logDebug("No response message received")
    }
    null
}[/mw_shl_code]

上述这段代码中最有意思的莫过于 sendMessageReliablySync,远程数据读取毫无疑问是一个异步 i/o 操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?
别急,继续去看看 sendMessageReliablySync 的定义:

[mw_shl_code=scala,true]def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
  : Future[Option[Message]] = {
    val promise = Promise[Option[Message]]
    val status = new MessageStatus(
            message, connectionManagerId, s => promise.success(s.ackMessage))
    messageStatuses.synchronized {
        messageStatuses += ((message.id, status))
    }
    sendMessage(connectionManagerId, message)
    promise.future
}[/mw_shl_code]

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字 Promise 和 Future 没?
如果这个 future 执行完毕,返回 s.ackMessage。我们再看看这个 ackMessage 是在什么地方被写入的呢。看一看 ConnectionManager.handleMessage 中的代码片段:

[mw_shl_code=scala,true]case bufferMessage: BufferMessage =>

{
    if (authEnabled) {
        val res = handleAuthentication(connection, bufferMessage)
        if (res == true) {
            // message was security negotiation so skip the rest
            logDebug("After handleAuth result was true, returning")
            return
        }
    }
    if (bufferMessage.hasAckId) {
        val sentMessageStatus = messageStatuses. synchronized {
            messageStatuses.get(bufferMessage.ackId) match {
                case Some(status) =>{
                    messageStatuses -= bufferMessage.ackId
                    status
                }
                case None =>{
                    throw new Exception("Could not find reference for received ack message " +
                            message.id)
                    null
                }
            }
        }
        sentMessageStatus. synchronized {
            sentMessageStatus.ackMessage = Some(message)
            sentMessageStatus.attempted = true
            sentMessageStatus.acked = true
            sentMessageStaus.markDone()
        }
    }
}[/mw_shl_code]

注意:此处的所调用的 sentMessageStatus.markDone 就会调用在 sendMessageReliablySync 中定义的 promise.Success,不妨看看 MessageStatus 的定义。

[mw_shl_code=scala,true]class MessageStatus(
val message: Message,
val connectionManagerId: ConnectionManagerId,
completionHandler: MessageStatus => Unit) {

    var ackMessage: Option[Message] = None
    var attempted = false
    var acked = false

    def markDone() { completionHandler(this) }
}[/mw_shl_code]

11.7 Partition 如何转化为 Block

在 storage 模块里面所有的操作都是和 block 相关的,但是在 RDD 里面所有的运算都是基于 partition 的,那么 partition 是如何与 block 对应上的呢?
RDD 计算的核心函数是 iterator() 函数:

[mw_shl_code=scala,true]final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}[/mw_shl_code]

如果当前 RDD 的 storage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute() 函数计算 RDD,在这个函数中 partition 和 block 发生了关系:
首先根据 RDD id 和 partition index 构造出 block id (rdd_xx_xx),接着从 BlockManager 中取出相应的 block。
如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。
如果该 block 不存在则需要调用 RDD 的 computeOrReadCheckpoint() 函数计算出新的 block,并将其存储到 BlockManager 中。
需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。

[mw_shl_code=scala,true]def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T]=
{
    val key = "rdd_%d_%d".format(rdd.id, split.index)
    logDebug("Looking for partition " + key)
    blockManager.get(key) match {
    case Some(values) =>
        // Partition is already materialized, so just return its values
        return values.asInstanceOf[Iterator[T]]

    case None =>
        // Mark the split as loading (unless someone else marks it first)
        loading. synchronized {
        if (loading.contains(key)) {
            logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
            while (loading.contains(key)) {
                try {
                    loading.wait()
                } catch {
                    case _:
                        Throwable =>}
            }
            logInfo("Finished waiting for %s".format(key))
            // See whether someone else has successfully loaded it. The main way this would fail
            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
            // partition but we didn't want to make space for it. However, that case is unlikely
            // because it's unlikely that two threads would work on the same RDD partition. One
            // downside of the current code is that threads wait serially if this does happen.
            blockManager.get(key) match {
                case Some(values) =>
                    return values.asInstanceOf[Iterator[T]]
                case None =>
                    logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
                    loading.add(key)
            }
        } else {
            loading.add(key)
        }
    }
    try {
        // If we got here, we have to load the split
        logInfo("Partition %s not found, computing it".format(key))
        val computedValues = rdd.computeOrReadCheckpoint(split, context)
        // Persist the result, so long as the task is not running locally
        if (context.runningLocally) {
            return computedValues
        }
        val elements = new ArrayBuffer[Any]
        elements++ = computedValues
        blockManager.put(key, elements, storageLevel, true)
        return elements.iterator.asInstanceOf[Iterator[T]]
    } finally {
        loading. synchronized {
            loading.remove(key)
            loading.notifyAll()
        }
    }
}[/mw_shl_code]

这样 RDD 的 transformation、action 就和 block 数据建立了联系,虽然抽象上我们的操作是在 partition 层面上进行的,但是 partitio n最终还是被映射成为 block,因此实际上我们的所有操作都是对 block 的处理和存取。

11.8 partition 和 block 的对应关系

在 RDD 中,核心的函数是 iterator:

[mw_shl_code=scala,true]final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
        SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
        computeOrReadCheckpoint(split, context)
    }
}[/mw_shl_code]

如果当前 RDD 的 storage level 不是 NONE 的话,表示该 RDD 在 BlockManager 中有存储,那么调用 CacheManager 中的 getOrCompute 函数计算 RDD,在这个函数中 partition 和 block 就对应起来了:
getOrCompute 函数会先构造 RDDBlockId,其中 RDDBlockId 就把 block 和 partition 联系起来了,RDDBlockId 产生的 name 就是 BlockId 的 name 属性,形式是:rdd_rdd.id_partition.index。

[mw_shl_code=scala,true]def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index)
    logDebug(s"Looking for partition $key")
    blockManager.get(key) match {
        case Some(blockResult) =>
            val existingMetrics = context.taskMetrics
                    .getInputMetricsForReadMethod(blockResult.readMethod)
            existingMetrics.incBytesRead(blockResult.bytes)

            val iter = blockResult.data.asInstanceOf[Iterator[T]]
            new InterruptibleIterator[T](context, iter) {
            override def next(): T = {
                    existingMetrics.incRecordsRead(1)
                    delegate.next()
            }
        }
        case None =>
            val storedValues = acquireLockForPartition[T](key)
            if (storedValues.isDefined) {
                return new InterruptibleIterator[T](context, storedValues.get)
            }

            try {
                logInfo(s"Partition $key not found, computing it")
                val computedValues = rdd.computeOrReadCheckpoint(partition, context)

                if (context.isRunningLocally) {
                    return computedValues
                }

                val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
                val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
                val metrics = context.taskMetrics
                val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
                metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
                new InterruptibleIterator(context, cachedValues)

            } finally {
                loading.synchronized {
                    loading.remove(key)
                    loading.notifyAll()
                }
            }
    }
}
[/mw_shl_code]
同时 getOrCompute 函数会对 block 进行判断:
如果该 block 存在,表示此 RDD 在之前已经被计算过和存储在 BlockManager 中,因此取出即可,无需再重新计算。
如果该 block 不存在则需要调用 RDD 的 computeOrReadCheckpoint() 函数计算出新的block,并将其存储到 BlockManager 中。
需要注意的是 block 的计算和存储是阻塞的,若另一线程也需要用到此 block 则需等到该线程 block 的 loading 结束。

第12章 Spark Shuffle 过程
12.1 MapReduce 的 Shuffle 过程介绍

    Shuffle 的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce 中的 Shuffle 更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。
    为什么 MapReduce 计算模型需要 Shuffle 过程?我们都知道 MapReduce 计算模型一般包括两个重要的阶段:Map 是映射,负责数据的过滤分发;Reduce 是规约,负责数据的计算归并。Reduce 的数据来源于 Map,Map 的输出即是 Reduce 的输入,Reduce 需要通过 Shuffle来 获取数据。
    从 Map 输出到 Reduce 输入的整个过程可以广义地称为 Shuffle。Shuffle 横跨 Map 端和 Reduce 端,在 Map 端包括 Spill 过程,在 Reduce 端包括 copy 和 sort 过程,如图所示:
EtjOJS.png

12.1.1 Spill 过程(刷写过程)

    Spill 过程包括输出、排序、溢写、合并等步骤,如图所示:
EtjxMj.png

Collect

    每个 Map 任务不断地以 <key, value> 对的形式把数据输出到内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。
    这个数据结构其实就是个字节数组,叫 kvbuffer,名如其义,但是这里面不光放置了 <key, value> 数据,还放置了一些索引数据,给放置索引数据的区域起了一个 kvmeta 的别名,在 kvbuffer 的一块区域上穿了一个 IntBuffer(字节序采用的是平台自身的字节序)的马甲。<key, value> 数据区域和索引数据区域在 kvbuffer 中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次 Spill 之后都会更新一次。初始的分界点是 0,<key, value> 数据的存储方向是向上增长,索引数据的存储方向是向下增长,如图所示:
EtvEz4.png

    kvbuffer 的存放指针 bufindex 是一直闷着头地向上增长,比如 bufindex 初始值为 0,一个 Int 型的 key 写完之后,bufindex 增长为 4,一个 Int 型的 value 写完之后,bufindex 增长为 8。
    索引是对 <key, value> 在 kvbuffer 中的索引,是个四元组,包括:value 的起始位置、key 的起始位置、partition 值、value 的长度,占用四个 Int 长度,kvmeta 的存放指针 kvindex 每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如 Kvindex 初始位置是 -4,当第一个 <key, value> 写完之后,(kvindex+0) 的位置存放 value 的起始位置、(kvindex+1) 的位置存放 key 的起始位置、(kvindex+2) 的位置存放 partition 的值、(kvindex+3) 的位置存放 value 的长度,然后 kvindex 跳到 -8 位置,等第二个 <key, value> 和索引写完之后,kvindex 跳到-32 位置。

    kvbuffer 的大小虽然可以通过参数设置,但是总共就那么大,<key, value> 和索引不断地增加,加着加着,kvbuffer 总有不够用的那天,那怎么办?把数据从内存刷到磁盘上再接着往内存写数据,把 kvbuffer 中的数据刷到磁盘上的过程就叫 Spill,多么明了的叫法,内存中的数据满了就自动地 spill 到具有更大空间的磁盘。

    关于 Spill 触发的条件,也就是 kvbuffer 用到什么程度开始 Spill,还是要讲究一下的。如果把 kvbuffer 用得死死得,一点缝都不剩的时候再开始 Spill,那 Map 任务就需要等 Spill 完成腾出空间之后才能继续写数据;如果 kvbuffer 只是满到一定程度,比如 80% 的时候就开始 Spill,那在 Spill 的同时,Map 任务还能继续写数据,如果 Spill 够快,Map 可能都不需要为空闲空间而发愁。两利相衡取其大,一般选择后者。

    Spill 这个重要的过程是由 Spill 线程承担,Spill 线程从 Map 任务接到“命令”之后就开始正式干活,干的活叫 SortAndSpill,原来不仅仅是 Spill,在 Spill 之前还有个颇具争议性的 Sort。

Sort

    先把 kvbuffer 中的数据按照 partition 值和 key 两个关键字升序排序,移动的只是索引数据,排序结果是 kvmeta 中数据按照 partition 为单位聚集在一起,同一 partition 内的按照 key 有序。

Spill


    Spill 线程为这次 Spill 过程创建一个磁盘文件:从所有的本地目录中轮询查找能存储这么大空间的目录,找到之后在其中创建一个类似于 “spill12.out” 的文件。Spill 线程根据排过序的 kvmeta 挨个 partition 的把 <key, value> 数据吐到这个文件中,一个 partition 对应的数据吐完之后顺序地吐下个 partition,直到把所有的 partition 遍历完。一个 partition 在文件中对应的数据也叫段 (segment)。

    所有的 partition 对应的数据都放在这个文件里,虽然是顺序存放的,但是怎么直接知道某个 partition 在这个文件中存放的起始位置呢?强大的索引又出场了。有一个三元组记录某个 partition 对应的数据在这个文件中的索引:起始位置、原始数据长度、压缩之后的数据长度,一个 partition 对应一个三元组。然后把这些索引信息存放在内存中,如果内存中放不下了,后续的索引信息就需要写到磁盘文件中了:从所有的本地目录中轮训查找能存储这么大空间的目录,找到之后在其中创建一个类似于 “spill12.out.index” 的文件,文件中不光存储了索引数据,还存储了 crc32 的校验数据。(spill12.out.index 不一定在磁盘上创建,如果内存(默认 1M 空间)中能放得下就放在内存中,即使在磁盘上创建了,和 spill12.out 文件也不一定在同一个目录下。)

    每一次 Spill 过程就会最少生成一个 out 文件,有时还会生成 index 文件,Spill 的次数也烙印在文件名中。索引文件和数据文件的对应关系如下图所示:
Etjzss.png

    在 Spill 线程如火如荼的进行 SortAndSpill 工作的同时,Map 任务不会因此而停歇,而是一无既往地进行着数据输出。Map 还是把数据写到 kvbuffer 中,那问题就来了:<key, value> 只顾着闷头按照 bufindex 指针向上增长,kvmeta 只顾着按照 kvindex 向下增长,是保持指针起始位置不变继续跑呢,还是另谋它路?如果保持指针起始位置不变,很快 bufindex 和 kvindex 就碰头了,碰头之后再重新开始或者移动内存都比较麻烦,不可取。Map 取 kvbuffer 中剩余空间的中间位置,用这个位置设置为新的分界点,bufindex 指针移动到这个分界点,kvindex 移动到这个分界点的 -16 位置,然后两者就可以和谐地按照自己既定的轨迹放置数据了,当 Spill 完成,空间腾出之后,不需要做任何改动继续前进。分界点的转换如下图所示:
EtjjzQ.png

    Map 任务总要把输出的数据写到磁盘上,即使输出数据量很小在内存中全部能装得下,在最后也会把数据刷到磁盘上。

     

12.1.2 Merge
EtvSLn.png

    Map 任务如果输出数据量很大,可能会进行好几次 Spill,out 文件和 Index 文件会产生很多,分布在不同的磁盘上。最后把这些文件进行合并的 merge 过程闪亮登场。
    Merge 过程怎么知道产生的 Spill 文件都在哪了呢?从所有的本地目录上扫描得到产生的 Spill 文件,然后把路径存储在一个数组里。Merge 过程又怎么知道 Spill 的索引信息呢?没错,也是从所有的本地目录上扫描得到 Index 文件,然后把索引信息存储在一个列表里。到这里,又遇到了一个值得纳闷的地方。在之前 Spill 过程中的时候为什么不直接把这些信息存储在内存中呢,何必又多了这步扫描的操作?特别是 Spill 的索引数据,之前当内存超限之后就把数据写到磁盘,现在又要从磁盘把这些数据读出来,还是需要装到更多的内存中。之所以多此一举,是因为这时 kvbuffer 这个内存大户已经不再使用可以回收,有内存空间来装这些数据了。(对于内存空间较大的土豪来说,用内存来省却这两个 io 步骤还是值得考虑的。)

     
    然后为 merge 过程创建一个叫 file.out 的文件和一个叫 file.out.Index 的文件用来存储最终的输出和索引。
    一个 partition 一个 partition 的进行合并输出。对于某个 partition 来说,从索引列表中查询这个 partition 对应的所有索引信息,每个对应一个段插入到段列表中。也就是这个 partition 对应一个段列表,记录所有的 Spill 文件中对应的这个 partition 那段数据的文件名、起始位置、长度等等。
    然后对这个 partition 对应的所有的 segment 进行合并,目标是合并成一个 segment。当这个 partition 对应很多个 segment 时,会分批地进行合并:先从 segment 列表中把第一批取出来,以 key 为关键字放置成最小堆,然后从最小堆中每次取出最小的 <key, value> 输出到一个临时文件中,这样就把这一批段合并成一个临时的段,把它加回到 segment 列表中;再从 segment 列表中把第二批取出来合并输出到一个临时 segment,把其加入到列表中;这样往复执行,直到剩下的段是一批,输出到最终的文件中。
    最终的索引数据仍然输出到 Index 文件中。
    Map 端的 Shuffle 过程到此结束。

12.1.3 Copy

    Reduce 任务通过 HTTP 向各个 Map 任务拖取它所需要的数据。每个节点都会启动一个常驻的 HTTP server,其中一项服务就是响应 Reduce 拖取 Map 数据。当有 MapOutput 的 HTTP 请求过来的时候,HTTP server 就读取相应的 Map 输出文件中对应这个 Reduce 部分的数据通过网络流输出给 Reduce。
    Reduce 任务拖取某个 Map 对应的数据,如果在内存中能放得下这次数据的话就直接把数据写到内存中。Reduce 要向每个 Map 去拖取数据,在内存中每个 Map 对应一块数据,当内存中存储的 Map 数据占用空间达到一定程度的时候,开始启动内存中 merge,把内存中的数据 merge 输出到磁盘上一个文件中。
    如果在内存中不能放得下这个 Map 的数据的话,直接把 Map 数据写到磁盘上,在本地目录创建一个文件,从 HTTP 流中读取数据然后写到磁盘,使用的缓存区大小是 64K。拖一个 Map 数据过来就会创建一个文件,当文件数量达到一定阈值时,开始启动磁盘文件 merge,把这些文件合并输出到一个文件。
    有些 Map 的数据较小是可以放在内存中的,有些 Map 的数据较大需要放在磁盘上,这样最后 Reduce 任务拖过来的数据有些放在内存中了有些放在磁盘上,最后会对这些来一个全局合并。

12.1.4 Merge Sort

    这里使用的 Merge 和 Map 端使用的 Merge 过程一样。Map 的输出数据已经是有序的,Merge 进行一次合并排序,所谓 Reduce 端的 sort 过程就是这个合并的过程。一般 Reduce 是一边 copy 一边 sort,即 copy 和 sort 两个阶段是重叠而不是完全分开的。
    Reduce 端的 Shuffle 过程至此结束。

12.2 HashShuffle 过程介绍

    Spark 丰富了任务类型,有些任务之间数据流转不需要通过 Shuffle,但是有些任务之间还是需要通过 Shuffle 来传递数据,比如 wide dependency 的 group by key。
    Spark 中需要 Shuffle 输出的 Map 任务会为每个 Reduce 创建对应的 bucket,Map 产生的结果会根据设置的 partitioner 得到对应的 bucketId,然后填充到相应的 bucket 中去。每个 Map 的输出结果可能包含所有的 Reduce 所需要的数据,所以每个 Map 会创建 R 个 bucket(R 是 reduce 的个数),M 个 Map 总共会创建 M*R 个 bucket。
    Map 创建的 bucket 其实对应磁盘上的一个文件,Map 的结果写到每个 bucket 中其实就是写到那个磁盘文件中,这个文件也被称为 blockFile,是 Disk Block Manager 管理器通过文件名的 Hash 值对应到本地目录的子目录中创建的。每个 Map 要在节点上创建 R 个磁盘文件用于结果输出,Map 的结果是直接输出到磁盘文件上的,100KB 的内存缓冲是用来创建 Fast Buffered OutputStream 输出流。这种方式一个问题就是 Shuffle 文件过多。
   
Etv9Zq.png
    1)每一个 Mapper 创建出和 Reducer 数目相同的 bucket,bucket 实际上是一个 buffer,其大小为 spark.shuffle.file.buffer.kb(默认 32KB)。
    2)Mapper 产生的结果会根据设置的 partition 算法填充到每个 bucket 中去,然后再写入到磁盘文件。
    3)Reducer 从远端或是本地的 block manager 中找到相应的文件读取数据。


    针对上述 Shuffle 过程产生的文件过多问题,Spark 有另外一种改进的 Shuffle 过程:consolidation Shuffle,以期显著减少 Shuffle 文件的数量。在 consolidation Shuffle 中每个 bucket 并非对应一个文件,而是对应文件中的一个 segment 部分。Job 的 map 在某个节点上第一次执行,为每个 reduce 创建 bucke 对应的输出文件,把这些文件组织成 ShuffleFileGroup,当这次 map 执行完之后,这个 ShuffleFileGroup 可以释放为下次循环利用;当又有 map 在这个节点上执行时,不需要创建新的 bucket 文件,而是在上次的 ShuffleFileGroup 中取得已经创建的文件继续追加写一个 segment;当前次 map 还没执行完,ShuffleFileGroup 还没有释放,这时如果有新的 map 在这个节点上执行,无法循环利用这个 ShuffleFileGroup,而是只能创建新的 bucket 文件组成新的 ShuffleFileGroup 来写输出。
   
EtvCd0.png
    比如一个 Job 有 3 个 Map 和 2 个 reduce:
    (1) 如果此时集群有 3 个节点有空槽,每个节点空闲了一个 core,则 3 个 Map 会调度到这 3 个节点上执行,每个 Map 都会创建 2 个 Shuffle 文件,总共创建 6 个 Shuffle 文件;
    (2) 如果此时集群有 2 个节点有空槽,每个节点空闲了一个 core,则 2 个 Map 先调度到这 2 个节点上执行,每个 Map 都会创建 2 个 Shuffle 文件,然后其中一个节点执行完 Map 之后又调度执行另一个 Map,则这个 Map 不会创建新的 Shuffle 文件,而是把结果输出追加到之前 Map 创建的 Shuffle 文件中;总共创建 4 个 Shuffle 文件;
    (3) 如果此时集群有 2 个节点有空槽,一个节点有 2 个空 core 一个节点有 1 个空 core,则一个节点调度 2 个 Map 一个节点调度 1 个 Map,调度 2 个 Map 的节点上,一个 Map 创建了 Shuffle 文件,后面的 Map 还是会创建新的 Shuffle 文件,因为上一个 Map 还正在写,它创建的 ShuffleFileGroup 还没有释放;总共创建 6 个 Shuffle 文件。
    优点:
    1)快-不需要排序,也不需要维持 hash 表
    2)不需要额外空间用作排序
    3)不需要额外IO-数据写入磁盘只需一次,读取也只需一次
    缺点:
    1)当 partitions 大时,输出大量的文件(cores * R),性能开始降低
    2)大量的文件写入,使文件系统开始变为随机写,性能比顺序写要降低 100 倍
    3)缓存空间占用比较大
    Reduce 去拖 Map 的输出数据,Spark 提供了两套不同的拉取数据框架:通过 socket 连接去取数据;使用n etty 框架去取数据。
    每个节点的 Executor 会创建一个 BlockManager,其中会创建一个 BlockManagerWorker 用于响应请求。当 Reduce 的 GET_BLOCK 的请求过来时,读取本地文件将这个 blockId 的数据返回给 Reduce。如果使用的是 Netty 框架,BlockManager 会创建 ShuffleSender 用于发送 Shuffle 数据。

    并不是所有的数据都是通过网络读取,对于在本节点的 Map 数据,Reduce 直接去磁盘上读取而不再通过网络框架。
    Reduce 拖过来数据之后以什么方式存储呢?Spark Map 输出的数据没有经过排序,Spark Shuffle 过来的数据也不会进行排序,Spark 认为 Shuffle 过程中的排序不是必须的,并不是所有类型的 Reduce 需要的数据都需要排序,强制地进行排序只会增加 Shuffle 的负担。Reduce 拖过来的数据会放在一个 HashMap 中,HashMap 中存储的也是 <key, value> 对,key 是 Map 输出的 key,Map 输出对应这个 key 的所有 value 组成 HashMap 的 value。Spark 将 Shuffle 取过来的每一个 <key, value> 对插入或者更新到 HashMap 中,来一个处理一个。HashMap 全部放在内存中。
    Shuffle 取过来的数据全部存放在内存中,对于数据量比较小或者已经在 Map 端做过合并处理的 Shuffle 数据,占用内存空间不会太大,但是对于比如 group by key 这样的操作,Reduce 需要得到 key 对应的所有 value,并将这些 value 组一个数组放在内存中,这样当数据量较大时,就需要较多内存。
    当内存不够时,要不就失败,要不就用老办法把内存中的数据移到磁盘上放着。Spark 意识到在处理数据规模远远大于内存空间时所带来的不足,引入了一个具有外部排序的方案。Shuffle 过来的数据先放在内存中,当内存中存储的 <key, value> 对超过 1000 并且内存使用超过 70% 时,判断节点上可用内存如果还足够,则把内存缓冲区大小翻倍,如果可用内存不再够了,则把内存中的 <key, value> 对排序然后写到磁盘文件中。最后把内存缓冲区中的数据排序之后和那些磁盘文件组成一个最小堆,每次从最小堆中读取最小的数据,这个和 MapReduce 中的 merge 过程类似。

12.3 SortShuffle 过程介绍


    从 1.2.0 开始默认为 sort shuffle(spark.shuffle.manager = sort),实现逻辑类似于 Hadoop MapReduce,Hash Shuffle 每一个 reducers 产生一个文件,但是 Sort Shuffle 只是产生一个按照 reducer id 排序可索引的文件,这样,只需获取有关文件中的相关数据块的位置信息,并 fseek 就可以读取指定 reducer 的数据。但对于 rueducer 数比较少的情况,Hash Shuffle 明显要比 Sort Shuffle 快,因此 Sort Shuffle 有个 “fallback” 计划,对于 reducers 数少于 “spark.shuffle.sort.bypassMergeThreshold” (200 by default),我们使用 fallback 计划,hashing 相关数据到分开的文件,然后合并这些文件为一个,具体实现为 BypassMergeSortShuffleWriter。
   
EtvFiT.png
    在 map 进行排序,在 reduce 端应用 Timsort[1] 进行合并。map 端是否容许 spill,通过 spark.shuffle.spill 来设置,默认是 true。设置为 false,如果没有足够的内存来存储 map 的输出,那么就会导致 OOM 错误,因此要慎用。
    用于存储 map 输出的内存为:“JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction,默认为: “JVM Heap Size” * 0.2 * 0.8 = “JVM Heap Size” * 0.16。如果你在同一个执行程序中运行多个线程(设定 spark.executor.cores/ spark.task.cpus 超过 1),每个 map 任务存储的空间为 “JVM Heap Size” * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus,默认 2 个 cores,那么为 0.08 * “JVM Heap Size”。
   
EtvPoV.png
    spark 使用 AppendOnlyMap 存储 map 输出的数据,利用开源 hash 函数 MurmurHash3 和平方探测法把 key 和 value 保存在相同的 array 中。这种保存方法可以是 spark 进行 combine。如果 spill 为 true,会在 spill 前 sort。
    与 hash shuffle 相比,sort shuffle 中每个 Mapper 只产生一个数据文件和一个索引文件,数据文件中的数据按照 Reducer 排序,但属于同一个 Reducer 的数据不排序。Mapper 产生的数据先放到 AppendOnlyMap 这个数据结构中,如果内存不够,数据则会 spill 到磁盘,最后合并成一个文件。
    与 Hash shuffle 相比,shuffle 文件数量减少,内存使用更加可控。但排序会影响速度。
    优点:
    1)map 创建文件量较少。
    2)少量的 IO 随机操作,大部分是顺序读写。
    缺点:
    1)要比 Hash Shuffle 要慢,需要自己通过 spark.shuffle.sort.bypassMergeThreshold 来设置合适的值。
    2)如果使用 SSD 盘存储 shuffle 数据,那么 Hash Shuffle 可能更合适。

12.4 TungstenShuffle 过程介绍

    Tungsten-sort 算不得一个全新的 shuffle 方案,它在特定场景下基于类似现有的 Sort Based Shuffle 处理流程,对内存 /CPU/Cache 使用做了非常大的优化。带来高效的同时,也就限定了自己的使用场景。如果 Tungsten-sort 发现自己无法处理,则会自动使用 Sort Based Shuffle 进行处理。Tungsten 中文是钨丝的意思。 Tungsten Project 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划,该计划初期似乎对 Spark SQL 优化的最多。不过部分 RDD API 还有 Shuffle 也因此受益。
    Tungsten-sort 优化点主要在三个方面:
    1)直接在 serialized binary data 上 sort 而不是 java objects,减少了 memory 的开销和 GC 的 overhead。
    2)提供 cache-efficient sorter,使用一个 8bytes 的指针,把排序转化成了一个指针数组的排序。
    3)spill 的 merge 过程也无需反序列化即可完成。

    这些优化的实现导致引入了一个新的内存管理模型,类似 OS 的 Page,对应的实际数据结构为 MemoryBlock,支持 off-heap 以及 in-heap 两种模式。为了能够对 Record 在这些 MemoryBlock 进行定位,引入了 Pointer(指针)的概念。
    如果你还记得 Sort Based Shuffle 里存储数据的对象 PartitionedAppendOnlyMap,这是一个放在 JVM heap 里普通对象,在 Tungsten-sort 中,他被替换成了类似操作系统内存页的对象。如果你无法申请到新的 Page,这个时候就要执行 spill 操作,也就是写入到磁盘的操作。具体触发条件,和 Sort Based Shuffle 也是类似的。
    Spark 默认开启的是 Sort Based Shuffle,想要打开 Tungsten-sort,请设置
    spark.shuffle.manager=tungsten-sort
    对应的实现类是:org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
    名字的来源是因为使用了大量 JDK Sun Unsafe API。

    当且仅当下面条件都满足时,才会使用新的 Shuffle 方式:
    1)Shuffle dependency 不能带有 aggregation 或者输出需要排序
    2)Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定义的一些序列化方式.
    3)Shuffle 文件的数量不能大于 16777216。
    4)序列化时,单条记录不能大于 128 MB。
    可以看到,能使用的条件还是挺苛刻的。
    这些限制来源于哪里
    参看如下代码,page 的大小:
    this.pageSizeBytes = (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,shuffleMemoryManager.pageSizeBytes());
    这就保证了页大小不超过 PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,该值就被定义成了 128M。
    而产生这个限制的具体设计原因,我们还要仔细分析下 Tungsten 的内存模型,如下图所示:
   
EtvAWF.png
    这张图其实画的是 on-heap 的内存逻辑图,其中 #Page 部分为 13bit,Offset 为 51bit,你会发现 2^51 >> 128M 的。但是在 Shuffle 的过程中,对 51bit 做了压缩,使用了 27bit,具体如下:
    [24 bit partition number][13 bit memory page number][27 bit offset in page]
    这里预留出的 24bi t给了 partition number,为了后面的排序用。上面的好几个限制其实都是因为这个指针引起的:
    第一个是 partition 的限制,前面的数字 16777216 就是来源于 partition number 使用 24bit 表示的。
    第二个是 page number。
    第三个是偏移量,最大能表示到 2^27=128M。那一个 Task 能管理到的内存是受限于这个指针的,最多是 2^13 * 128M 也就是 1TB 左右。

    有了这个指针,我们就可以定位和管理到 off-heap 或者 on-heap 里的内存了。这个模型还是很漂亮的,内存管理也非常高效,记得之前的预估 PartitionedAppendOnlyMap 的内存是非常困难的,但是通过现在的内存管理机制,是非常快速并且精确的。
    对于第一个限制,那是因为后续 Shuffle Write 的 sort 部分,只对前面 24bit 的 partiton number 进行排序,key 的值没有被编码到这个指针,所以没办法进行 ordering。
    同时,因为整个过程是追求不反序列化的,所以不能做 aggregation。

    Shuffle Write
    核心类:
    org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter
    数据会通过 UnsafeShuffleExternalSorter.insertRecordIntoSorter 一条一条写入到 serOutputStream 序列化输出流。
    这里消耗内存的地方是
    serBuffer = new MyByteArrayOutputStream(1024 * 1024)
    默认是 1M,类似于 Sort Based Shuffle 中的 ExternalSorter,在 Tungsten Sort 对应的为 UnsafeShuffleExternalSorter,记录序列化后就通过 sorter.insertRecord 方法放到 sorter 里去了。
    这里 sorter 负责申请 Page,释放 Page,判断是否要进行 spill 都这个类里完成。代码的架子其实和 Sort Based 是一样的。
   
EtvkJU.png
    (另外,值得注意的是,这张图里进行 spill 操作的同时检查内存可用而导致的 Exeception 的 bug 已经在 1.5.1 版本被修复了,忽略那条路径)
    内存是否充足的条件依然 shuffleMemoryManager 来决定,也就是所有 Task Shuffle 申请的 Page 内存总和不能大于下面的值:
    ExecutorHeapMemeory * 0.2 * 0.8
    上面的数字可通过下面两个配置来更改:
    spark.shuffle.memoryFraction=0.2
    spark.shuffle.safetyFraction=0.8
    UnsafeShuffleExternalSorter 负责申请内存,并且会生成该条记录最后的逻辑地址,也就前面提到的 Pointer。
    接着 Record 会继续流转到 UnsafeShuffleInMemorySorter 中,这个对象维护了一个指针数组:
    private long[] pointerArray;
    数组的初始大小为 4096,后续如果不够了,则按每次两倍大小进行扩充。
    假设 100 万条记录,那么该数组大约是 8M 左右,所以其实还是很小的。一旦 spill 后该 UnsafeShuffleInMemorySorter 就会被赋为 null,被回收掉。
    我们回过头来看 spill,其实逻辑上也异常简单了,UnsafeShuffleInMemorySorter 会返回一个迭代器,该迭代器粒度每个元素就是一个指针,然后到根据该指针可以拿到真实的 record,然后写入到磁盘,因为这些 record 在一开始进入 UnsafeShuffleExternalSorter 就已经被序列化了,所以在这里就纯粹变成写字节数组了。形成的结构依然和 Sort Based Shuffle 一致,一个文件里不同的 partiton 的数据用 fileSegment 来表示,对应的信息存在一个 index 文件里。
    另外写文件的时候也需要一个 buffer:
    spark.shuffle.file.buffer=32k
    另外从内存里拿到数据放到 DiskWriter,这中间还要有个中转,是通过:
    final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];
    来完成的,都是内存,所以很快。
    Task 结束前,我们要做一次 mergeSpills 操作,然后形成一个 shuffle 文件。这里面其实也挺复杂的,
    如果开启了
    spark.shuffle.unsafe.fastMergeEnabled=true
    并且没有开启
    spark.shuffle.compress=true
    或者压缩方式为:
    LZFCompressionCodec
    则可以非常高效的进行合并,叫做 transferTo。不过无论是什么合并,都不需要进行反序列化。

    Shuffle Read
    Shuffle Read 完全复用 HashShuffleReader,具体参看 Sort-Based Shuffle。

12.5 MapReduce 与 Spark 过程对比

MapReduce 和 Spark 的 Shuffle 过程对比如下:
EtvmLR.png

第13章 Spark 内存管理

    Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文中阐述的原理基于 Spark 2.1 版本。
    在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能。由于 Driver 的内存管理相对来说较为简单,本文主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

13.1 堆内和堆外内存规划

    作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。堆内和堆外内存示意图如下:
EtvZQJ.png

13.1.1 堆内内存

    堆内内存的大小,由 Spark 应用程序启动时的 -executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同(下面第 2 小节会进行介绍)。
    Spark 对堆内内存的管理是一种逻辑上的规划式的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:
    申请内存:
    1)Spark 在代码中 new 一个对象实例
    2)JVM 从堆内内存分配空间,创建对象并返回对象引用
    3)Spark 保存该对象的引用,记录该对象占用的内存
    释放内存:
    1)Spark 记录该对象释放的内存,删除该对象的引用
    2)等待 JVM 的垃圾回收机制释放该对象占用的堆内内存

    我们知道,JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程--反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。
    对于 Spark 中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期。此外,在被 Spark 标记为释放的对象实例,很有可能在实际上并没有被 JVM 回收,导致实际可用的内存小于 Spark 记录的可用内存。所以 Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。
    虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

13.1.2 堆外内存

    为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
    在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

13.1.3 内存管理接口

    Spark 为存储内存和执行内存的管理提供了统一的接口--MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存:

内存管理接口的主要方法:

// 申请存储内存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申请展开内存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
// 申请执行内存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
// 释放存储内存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
// 释放执行内存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
// 释放展开内存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit

Spark的内存管理 – 内存管理接口
Etvey9.png
    我们看到,在调用这些方法时都需要指定其内存模式(MemoryMode),这个参数决定了是在堆内还是堆外完成这次操作。MemoryManager 的具体实现上,Spark 1.6 之后默认为统一管理(Unified Memory Manager)方式,1.6 之前采用的静态管理(Static Memory Manager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。两种方式的区别在于对空间分配的方式,下面的第 2 小节会分别对这两种方式进行介绍。

13.2 内存空间分配
13.2.1 静态内存管理

    在 Spark 最初采用的静态内存管理机制下,存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。

静态内存管理图示--堆内
EtvKdx.png
可以看到,可用的堆内内存的大小需要按照下面的方式计算:

可用堆内内存空间:

可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction

    其中 systemMaxMemory 取决于当前 JVM 堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的 memoryFraction 参数和 safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和 “其它内存” 一样交给了 JVM 去管理。

    堆外的空间分配较为简单,只有存储内存和执行内存,如下图所示。可用的执行内存和存储内存占用的空间大小直接由参数 spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。

静态内存管理图示--堆外
Etvue1.png
    静态内存管理机制实现起来较为简单,但如果用户不熟悉 Spark 的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成 “一半海水,一半火焰” 的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。

13.2.2 统一内存管理

    Spark 1.6 之后引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。

统一内存管理图示--堆内
EtvMo6.png
统一内存管理图示--堆外
EtvJQH.png
    其中最重要的优化在于动态占用机制,其规则如下:
    1)设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围。
    2)双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)。
    3)执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后 “归还” 借用的空间。
    4)存储内存的空间被对方占用后,无法让对方 “归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

动态占用机制图示
Etv3WD.png
    凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。譬如,所以如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

13.3 存储内存管理
13.3.1 RDD 的持久化机制


    弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的 RDD 上执行转换(Transformation)操作产生一个新的 RDD。转换后的 RDD 与原始的 RDD 之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个 RDD 都可以被重新恢复。但 RDD 的所有转换都是惰性的,即只有当一个返回结果给 Driver 的行动(Action)发生时,Spark 才会创建任务读取 RDD,然后真正触发转换的执行。
    Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理(存储内存的其他应用场景,如缓存 broadcast 数据,暂时不在本文的讨论范围之内)。
    RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时 Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block(BlockId 的格式为 rdd_RDD-ID_PARTITION-ID )。Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。

Storage 模块示意图
Etv1JO.png
在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的 存储级别,而存储级别是以下 5 个变量的组合:
存储级别

class StorageLevel private(
  private var _useDisk: Boolean,        // 磁盘
  private var _useMemory: Boolean,      // 这里其实是指堆内内存
  private var _useOffHeap: Boolean,     // 堆外内存
  private var _deserialized: Boolean,   // 是否为非序列化
  private var _replication: Int = 1     // 副本个数
)

通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:

    1)存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。
    2)存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。
    3)副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本。

13.3.2 RDD 缓存的过程

    RDD 在缓存到存储内存之前,Partition 中的数据一般以迭代器(Iterator)的数据结构来访问,这是 Scala 语言中一种遍历数据集合的方法。通过 Iterator 可以获取分区中每一条序列化或者非序列化的数据项 (Record),这些 Record 的对象实例在逻辑上占用了 JVM 堆内内存的 other 部分的空间,同一 Partition 的不同 Record 的空间并不连续。
    RDD 在缓存到存储内存之后,Partition 被转换成 Block,Record 在堆内或堆外存储内存中占用一块连续的空间。将 Partition 由不连续的存储空间转换为连续存储空间的过程,Spark 称之为 “展开”(Unroll)。Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的 Block 以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的 Block 则以 SerializedMemoryEntry 的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage 模块用一个链式 Map 结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的 Block 对象的实例,对这个 LinkedHashMap 新增和删除间接记录了内存的申请和释放。
    因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的 Unroll 空间来临时占位,空间不足则 Unroll 失败,空间足够时可以继续进行。对于序列化的 Partition,其所需的 Unroll 空间可以直接累加计算,一次申请。而非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的 Unroll 空间并进行申请,空间不足时可以中断,释放已占用的 Unroll 空间。如果最终 Unroll 成功,当前 Partition 所占用的 Unroll 空间被转换为正常的缓存 RDD 的存储空间,如下图所示。

Spark Unroll 示意图
EtvlFK.png
    在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。

13.3.3 淘汰和落盘

    由于同一个 Executor 的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对 LinkedHashMap 中的旧 Block 进行淘汰(Eviction),而被淘汰的 Block 如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该 Block。
    存储内存的淘汰规则为:
    1)被淘汰的旧 Block 要与新 Block 的 MemoryMode 相同,即同属于堆外或堆内内存。
    2)新旧 Block 不能属于同一个 RDD,避免循环淘汰。
    3)旧 Block 所属 RDD 不能处于被读状态,避免引发一致性问题。
    4)遍历 LinkedHashMap 中 Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新 Block 所需的空间。其中 LRU 是 LinkedHashMap 的特性。
    落盘的流程则比较简单,如果其存储级别符合_useDisk 为 true 的条件,再根据其 _deserialized 判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在 Storage 模块中更新其信息。

13.4 执行内存管理
13.4.1 多任务间内存分配


    Executor 内运行的任务同样共享执行内存,Spark 用一个 HashMap 结构保存了任务到内存耗费的映射。每个任务可占用的执行内存大小的范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的任务的个数。每个任务在启动之时,要向 MemoryManager 请求申请最少为 1/2N 的执行内存,如果不能被满足要求则该任务被阻塞,直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。

13.4.2 Shuffle 的内存占用

    执行内存主要用来存储任务在执行 Shuffle 时占用的内存,Shuffle 是按照一定规则对 RDD 数据重新分区的过程,我们来看 Shuffle 的 Write 和 Read 两阶段对执行内存的使用:
    Shuffle Write
    1)若在 map 端选择普通的排序方式,会采用 ExternalSorter 进行外排,在内存中存储数据时主要占用堆内执行空间。
    2)若在 map 端选择 Tungsten 的排序方式,则采用 ShuffleExternalSorter 直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
    Shuffle Read
    1)在对 reduce 端的数据进行聚合时,要将数据交给 Aggregator 处理,在内存中存储数据时占用堆内执行空间。
    2)如果需要进行最终结果排序,则要将再次将数据交给 ExternalSorter 处理,占用堆内执行空间。

    在 ExternalSorter 和 Aggregator 中,Spark 会使用一种叫 AppendOnlyMap 的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从 MemoryManager 申请到新的执行内存时,Spark 就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。
    Shuffle Write 阶段中用到的 Tungsten 是 Databricks 公司提出的对 Spark 优化内存和 CPU 使用的计划,解决了一些 JVM 在性能上的限制和弊端。Spark 会根据 Shuffle 的情况来自动选择是否采用 Tungsten 排序。Tungsten 采用的页式内存管理机制建立在 MemoryManager 之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个 MemoryBlock 来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。堆内的 MemoryBlock 是以 long 型数组的形式分配的内存,其 obj 的值为是这个数组的对象引用,offset 是 long 型数组的在 JVM 中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock 是直接申请到的内存块,其 obj 为 null,offset 是这个内存块在系统内存中的 64 位绝对地址。Spark 用 MemoryBlock 巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个 Task 申请到的内存页。

    Tungsten 页式管理下的所有内存用 64 位的逻辑地址表示,由页号和页内偏移量组成:
    页号:占 13 位,唯一标识一个内存页,Spark 在申请内存页之前要先申请空闲页号。
    页内偏移量:占 51 位,是在使用内存页存储数据时,数据在页内的偏移地址。
    有了统一的寻址方式,Spark 可以用 64 位逻辑地址的指针定位到堆内或堆外的内存,整个 Shuffle Write 排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和 CPU 使用效率带来了明显的提升。

    Spark 的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark 用一个 LinkedHashMap 来集中管理所有的 Block,Block 由需要缓存的 RDD 的 Partition 转化而成;而对于执行内存,Spark 用 AppendOnlyMap 来存储 Shuffle 过程中的数据,在 Tungsten 排序中甚至抽象成为页式内存管理,开辟了全新的 JVM 内存管理机制。


第14章 部署模式解析
14.1 部署模式概述

    Spark 支持的主要的三种分布式部署方式分别是 standalone、spark on mesos 和 spark on YARN。standalone 模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖任何其他资源管理系统。它是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。而 yarn 是统一的资源管理机制,在上面可以运行多套计算框架,如 map reduce、storm 等根据 driver 在集群中的位置不同,分为 yarn client 和 yarn cluster。而 mesos 是一个更强大的分布式资源管理框架,它允许多种不同的框架部署在其上,包括 yarn。基本上,Spark 的运行模式取决于传递给 SparkContext 的 MASTER 环境变量的值,个别模式还需要辅助的程序接口来配合使用,目前支持的 Master 字符串及 URL 包括:
   
EtvYyd.png
    用户在提交任务给 Spark 处理时,以下两个参数共同决定了 Spark 的运行方式:
    &#8226; --master MASTER_URL :决定了 Spark 任务提交给哪种集群处理。
    &#8226; --deploy-mode DEPLOY_MODE :决定了 Driver 的运行方式,可选值为 Client 或者 Cluster。

     

14.2 standalone 框架

    standalone 集群由三个不同级别的节点组成,分别是:
    1)Master 主控节点,可以类比为董事长或总舵主,在整个集群之中,最多只有一个 Master 处在 Active 状态。
    2)Worker 工作节点,这个是 manager,是分舵主, 在整个集群中,可以有多个 Worker,如果 Worker 为零,什么事也做不了。
    3)Executor 干苦力活的,直接受 Worker 掌控,一个 Worker 可以启动多个 executor,启动的个数受限于机器中的 cpu 核数。
    这三种不同类型的节点各自运行于自己的JVM进程之中。

    Standalone 模式下,集群启动时包括 Master 与 Worker,其中 Master 负责接收客户端提交的作业,管理 Worker。根据作业提交的方式不同,分为 driver on client 和 drvier on worker。如下图所示,上图为 driver on client 模式,下图为 driver on work 模式。两种模式的主要不同点在于 driver 所在的位置。
    在 standalone 部署模式下又分为 client 模式和 cluster 模式。
    在client 模式下,driver 和 client 运行于同一 JVM 中,不由 worker 启动,该 JVM 进程直到 spark application 计算完成返回结果后才退出。如下图所示:
   
EtvGSe.png
    而在 cluster 模式下,driver 由 worker 启动,client 在确认 spark application 成功提交给 cluster 后直接退出,并不等待 spark application 运行结果返回。如下图所示:
     
EtvtOA.png
    从部署图来进行分析,每个 JVM 进程在启动时的文件依赖如何得到满足。
    1)Master 进程最为简单,除了 spark jar 包之外,不存在第三方库依赖。
    2)Driver 和 Executor 在运行的时候都有可能存在第三方包依赖,分开来讲。
    3)Driver 比较简单,spark-submit 在提交的时候会指定所要依赖的 jar 文件从哪里读取。
    4)Executor 由 Worker 来启动,Worker 需要下载 Executor 启动时所需要的 jar 文件,那么从哪里下载呢?
Etv0Ff.png
    Spark Standalone 模式,即独立模式,自带完整的服务,可单独部署到一个集群中,无需依赖其他资源管理系统。在该模式下,用户可以通过手动启动 Master 和 Worker 来启动一个独立的集群。其中,Master 充当了资源管理的角色,Workder 充当了计算节点的角色。在该模式下,Spark Driver 程序在客户端(Client)运行,而 Executor 则在 Worker 节点上运行。以下是一个运行在 Standalone 模式下,包含一个 Master 节点,两个 Worker 节点的 Spark 任务调度交互部署架构图。
Etvawt.png
    从上面的 Spark 任务调度过程可以看到:
    1)整个集群分为 Master 节点和 Worker 节点,其 Driver 程序运行在客户端。Master 节点负责为任务分配 Worker 节点上的计算资源,两者会通过相互通信来同步资源状态,见途中红色双向箭头。
    2)客户端启动任务后会运行 Driver 程序,Driver 程序中会完成 SparkContext 对象的初始化,并向 Master 进行注册。
    3)每个 Workder 节点上会存在一个或者多个 ExecutorBackend 进程。每个进程包含一个 Executor 对象,该对象持有一个线程池,每个线程池可以执行一个任务(Task)。ExecutorBackend 进程还负责跟客户端节点上的 Driver 程序进行通信,上报任务状态。

     

14.2.1 Standalone 模式下任务运行过程


    上面的过程反映了 Spark 在 standalone 模式下,整体上客户端、Master 和 Workder 节点之间的交互。对于一个任务的具体运行过程需要更细致的分解,分解运行过程见图中的小字。
    1) 用户通过 bin/spark-submit 部署工具或者 bin/spark-class 启动应用程序的 Driver 进程,Driver 进程会初始化 SparkContext 对象,并向 Master 节点进行注册。
    &#8226; 1、Master 节点接受 Driver 程序的注册,检查它所管理的 Worker 节点,为该 Driver 程序分配需要的计算资源 Executor。Worker 节点完成 Executor 的分配后,向 Master 报告 Executor 的状态。
    &#8226; 2、Worker 节点上的 ExecutorBackend 进程启动后,向 Driver 进程注册。
    2) Driver 进程内部通过 DAG Schaduler、Stage Schaduler、Task Schaduler 等过程完成任务的划分后,向 Worker 节点上的 ExecutorBackend 分配 TASK。
    &#8226; 1、ExecutorBackend 进行 TASK 计算,并向 Driver 报告 TASK 状态,直至结束。
    &#8226; 2、Driver 进程在所有 TASK 都处理完成后,向 Master 注销。

14.2.2 总结

    Spark 能够以 standalone 模式运行,这是 Spark 自身提供的运行模式,用户可以通过手动启动 master 和 worker 进程来启动一个独立的集群,也可以在一台机器上运行这些守护进程进行测试。standalone 模式可以用在生产环境,它有效的降低了用户学习、测试 Spark 框架的成本。
    standalone 模式目前只支持跨应用程序的简单 FIFO 调度。然而,为了允许多个并发用户,你可以控制每个应用使用的资源的最大数。默认情况下,它会请求使用集群的全部 CUP 内核。
    缺省情况下,standalone 任务调度允许 worker 的失败(在这种情况下它可以将失败的任务转移给其他的 worker)。但是,调度器使用 master 来做调度,这会产生一个单点问题:如果 master 崩溃,新的应用不会被创建。为了解决这个问题,可以通过 zookeeper 的选举机制在集群中启动多个 master,也可以使用本地文件实现单节点恢复。

14.3 yarn 集群模式

    Apache yarn 是 apache Hadoop 开源项目的一部分。设计之初是为了解决 mapreduce 计算框架资源管理的问题。到 haodoop 2.0 使用 yarn 将 mapreduce 的分布式计算和资源管理区分开来。它的引入使得 Hadoop 分布式计算系统进入了平台化时代,即各种计算框架可以运行在一个集群中,由资源管理系统 YRAN 进行统一的管理和调度,从而共享整个集群资源、提高资源利用率。
    YARN 总体上也 Master/Slave 架构--ResourceManager/NodeManager。前者(RM)负责对各个 NodeManager(NM) 上的资源进行统一管理和调度。而 Container 是资源分配和调度的基本单位,其中封装了机器资源,如内存、CPU、磁盘和网络等,每个任务会被分配一个 Container,该任务只能在该 Container 中执行,并使用该 Container 封装的资源。NodeManager 的作用则是负责接收并启动应用的 Container、而向 RM 回报本节点上的应用 Container 运行状态和资源使用情况。ApplicationMaster 与具体的 Application 相关,主要负责同 ResourceManager 协商以获取合适的 Container,并跟踪这些 Container 的状态和监控其进度。如下图所示为 yarn 集群的一般模型。
    简单架构图如下:
   
ENSR0A.png
    详细架构图如下:
     
EtvBY8.png
    Spark 在 yarn 集群上的部署方式分为两种,yarn cluster(driver 运行在 master 上)和 yarn client(driver 运行在 client 上)。
    driver on master 如下图所示:
   
EtvcOs.png
    &#8226; (1) Spark Yarn Client 向 YARN 中提交应用程序,包括 Application Master 程序、启动 Application Master 的命令、需要在 Executor 中运行的程序等。
    &#8226; (2) Resource manager 收到请求后,在其中一个 Node Manager 中为应用程序分配一个 Container,要求它在 Container 中启动应用程序的 Application Master,Application Master 初始化 sparkContext 以及创建 DAG Scheduler 和 Task Scheduler。
    &#8226; (3) Application Master 根据 SparkContext 中的配置,向 Resource Manager 申请 Container,同时,Application Master 向 Resource Manager 注册,这样用户可通过 Resource Manager 查看应用程序的运行状态。
    &#8226; (4) Resource Manager 在集群中寻找符合条件的 Node Manager,在 Node Manager 启动 Container,要求 Container 启动 Executor。
    &#8226; (5) Executor 启动后向 Application Master 注册,并接收 Application Master 分配的 Task。
    &#8226; (6) 应用程序运行完成后,Application Master 向 Resource Manager 申请注销并关闭自己。
    driver on client 如下图所示:
EtvdTP.png
    &#8226; (1) Spark Yarn Client 向 YARN 的 Resource Manager 申请启动 Application Master。同时在 SparkContent 初始化中将创建 DAG Scheduler 和 Task Scheduler 等。
    &#8226; (2) ResourceManager 收到请求后,在集群中选择一个 NodeManager,为该应用程序分配第一个 Container,要求它在这个 Container 中启动应用程序的 ApplicationMaster,与 YARN-Cluster 区别的是在该 ApplicationMaster 不运行 SparkContext,只与 SparkContext 进行联系进行资源的分派。
    &#8226; (3) Client 中的 SparkContext 初始化完毕后,与 Application Master 建立通讯,向 Resource Manager 注册,根据任务信息向 Resource Manager 申请资源 (Container)。
    &#8226; (4) 当 Application Master 申请到资源后,便与 Node Manager 通信,要求它启动 Container。
    &#8226; (5) Container 启动后向 Driver 中的 SparkContext 注册,并申请 Task。
    &#8226; (6) 应用程序运行完成后,Client 的 SparkContext 向 ResourceManage r申请注销并关闭自己。

     

    Yarn-client 和Yarn cluster 模式对比可以看出,在 Yarn-client(Driver on client)中,Application Master 仅仅从 Yarn 中申请资源给 Executor,之后 client 会跟 container 通信进行作业的调度。如果 client 离集群距离较远,建议不要采用此方式,不过此方式有利于交互式的作业。  
   
EtvylQ.png
    Spark 能够以集群的形式运行,可用的集群管理系统有 Yarn、Mesos 等。集群管理器的核心功能是资源管理和任务调度。以 Yarn 为例,Yarn 以 Master/Slave 模式工作,在 Master 节点运行的是 Resource Manager(RM),负责管理整个集群的资源和资源分配。在 Slave 节点运行的 Node Manager(NM),是集群中实际拥有资源的工作节点。我们提交 Job 以后,会将组成 Job 的多个 Task 调度到对应的 Node Manager 上进行执行。另外,在 Node Manager 上将资源以 Container 的形式进行抽象,Container 包括两种资源 内存 和 CPU。
    以下是一个运行在 Yarn 集群上,包含一个 Resource Manager 节点,三个 Node Manager 节点(其中,两个是 Worker 节点,一个 Master 节点)的 Spark 任务调度交换部署架构图。 
EtvDfS.png
    从上面的Spark任务调度过程图可以看到:
    1)整个集群分为 Master 节点和 Worker 节点,它们都存在于 Node Manager 节点上,在客户端提交任务时由 Resource Manager 统一分配,运行 Driver 程序的节点被称为 Master 节点,执行具体任务的节点被称为 Worder 节点。Node Manager 节点上资源的变化都需要及时更新给 Resource Manager,见图中红色双向箭头。
    2)Master 节点上常驻 Master 守护进程 -- Driver 程序,Driver 程序中会创建 SparkContext对 象,并负责跟各个 Worker 节点上的 ExecutorBackend 进程进行通信,管理 Worker 节点上的任务,同步任务进度。实际上,在 Yarn 中 Node Manager 之间的关系是平等的,因此 Driver 程序会被调度到任何一个 Node Manager 节点。
    3)每个 Worker 节点上会存在一个或者多个 ExecutorBackend 进程。每个进程包含一个 Executor 对象,该对象持有一个线程池,每个线程池可以执行一个任务(Task)。ExecutorBackend 进程还负责跟 Master 节点上的 Driver 程序进行通信,上报任务状态。  


集群下任务运行过程

    上面的过程反映出了 Spark 在集群模式下,整体上 Resource Manager 和 Node Manager 节点间的交互,Master 和 Worker 之间的交互。对于一个任务的具体运行过程需要更细致的分解,分解运行过程见图中的小字。
    &#8226; 1) 用户通过 bin/spark-submit 部署工具或者 bin/spark-class 向 Yarn 集群提交应用程序。
    &#8226; 2) Yarn 集群的 Resource Manager 为提交的应用程序选择一个 Node Manager 节点并分配第一个 Container,并在该节点的 Container 上启动 SparkContext 对象。
    &#8226; 3) SparkContext 对象向 Yarn 集群的 Resource Manager 申请资源以运行 Executor。
    &#8226; 4) Yarn 集群的 Resource Manager 分配 Container 给 SparkContext 对象,SparkContext 和相关的 Node Manager 通讯,在获得的 Container 上启动 ExecutorBackend 守护进程,ExecutorBackend 启动后开始向 SparkContext 注册并申请 Task。
    &#8226; 5) SparkContext 分配 Task 给 ExecutorBackend 执行。
    &#8226; 6) ExecutorBackend 开始执行 Task,并及时向 SparkContext 汇报运行状况。Task 运行完毕,SparkContext 归还资源给 Node Manager,并注销退。

14.4 mesos 集群模式

    Mesos 是 apache 下的开源分布式资源管理框架。起源于加州大学伯克利分校,后被 Twitter 推广使用。Mesos 上可以部署多种分布式框架,Mesos 的架构图如下图所示,其中 Framework 是指外部的计算框架,如 Hadoop、Mesos 等,这些计算框架可通过注册的方式接入 Mesos,以便 Mesos 进行统一管理和资源分配。
   
EtvWT0.png
    在 Mesos 上运行的 Framework 由两部分组成:一个是 scheduler ,通过注册到 Master 来获取集群资源。另一个是在 Slave 节点上运行的 executor 进程,它可以执行 Framework 的 task 。 Master 决定为每个 Framework 提供多少资源,Framework 的 scheduler 来选择其中提供的资源。当 Framework 同意了提供的资源,它通过 Master 将 task 发送到提供资源的 Slaves 上运行。Mesos c的资源分配图如下图所示:
EtvsSg.png
    (1) Slave1 向 Master 报告,有 4 个 CPU 和 4 GB 内存可用。
    (2) Master 发送一个 Resource Offer 给 Framework1 来描述 Slave1 有多少可用资源。
    (3) FrameWork1 中的 FW Scheduler 会答复 Master,我有两个 Task 需要运行在 Slave1,一个 Task 需要 <2个CPU,1 GB内存="">,另外一个 Task 需要 <1个CPU,2 GB内存="">。
    (4) 最后,Master 发送这些 Tasks 给 Slave1。然后,Slave1 还有 1 个 CPU 和 1GB 内存没有使用,所以分配模块可以把这些资源提供给 Framework2。

    Spark 可作为其中一个分布式框架部署在 mesos 上,部署图与 mesos 的一般框架部署图类似,如下图所示,这里不再重述。
EtvhkV.png
14.5 spark 三种部署模式的区别

    在这三种部署模式中,standalone 作为 spark 自带的分布式部署模式,是最简单也是最基本的 spark 应用程序部署模式,这里就不再赘述。这里就讲一下 yarn 和 mesos 的区别:
    (1) 就两种框架本身而言,mesos上可部署 yarn 框架。而 yarn 是更通用的一种部署框架,而且技术较成熟。
    (2) mesos 双层调度机制,能支持多种调度模式,而 yarn 通过 Resource Mananger 管理集群资源,只能使用一种调度模式。Mesos 的双层调度机制为:mesos 可接入如 yarn 一般的分布式部署框架,但 Mesos 要求可接入的框架必须有一个调度器模块,该调度器负责框架内部的任务调度。当一个 Framework 想要接入 mesos 时,需要修改自己的调度器,以便向 mesos 注册,并获取 mesos 分配给自己的资源,这样再由自己的调度器将这些资源分配给框架中的任务,也就是说,整个 mesos 系统采用了双层调度框架:第一层,由 mesos 将资源分配给框架;第二层,框架自己的调度器将资源分配给自己内部的任务。
    (3) mesos 可实现粗、细粒度资源调度,可动态分配资源,而 yarn 只能实现静态资源分配。其中粗粒度和细粒度调度定义如下:
    粗粒度模式(Coarse-grained Mode):程序运行之前就要把所需要的各种资源(每个 executor 占用多少资源,内部可运行多少个 executor)申请好,运行过程中不能改变。
    细粒度模式(Fine-grained Mode):为了防止资源浪费,对资源进行按需分配。与粗粒度模式一样,应用程序启动时,先会启动 executor,但每个 executor 占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos 会为每个 executor 动态分配资源,每分配一些,便可以运行一个新任务,单个 Task 运行完之后可以马上释放对应的资源。每个 Task 会汇报状态给 Mesos Slave 和 Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于 MapReduce 调度模式,每个 Task 完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
    从 yarn 和 mesos 的区别可看出,它们各自有优缺点。因此实际使用中,选择哪种框架,要根据本公司的实际需要而定,可考虑现有的大数据生态环境。如我司采用 yarn 部署 spark,原因是,我司早已有较成熟的 hadoop 的框架,考虑到使用的方便性,采用了 yarn 模式的部署。

14.6 异常场景分析

上面说明的是正常情况下,各节点的消息分发细节。那么如果在运行中,集群中的某些节点出现了问题,整个集群是否还能够正常处理 Application 中的任务呢?
14.6.1 异常分析1:Worker 异常退出
Etv6yj.png
在 Spark 运行过程中,经常碰到的问题就是 Worker 异常退出,当 Worker 退出时,整个集群会有哪些故事发生呢?请看下面的具体描述:
1)Worker 异常退出,比如说有意识的通过 kill 指令将 Worker 杀死。
2)Worker 在退出之前,会将自己所管控的所有小弟 Executor 全干掉。
3)Worker 需要定期向 Master 改善心跳消息的,现在 Worker 进程都已经玩完了,哪有心跳消息,所以 Master 会在超时处理中意识到有一个 “分舵” 离开了。
4)Master 非常伤心,伤心的 Master 将情况汇报给了相应的 Driver。
Driver 通过两方面确认分配给自己的 Executor 不幸离开了,一是 Master 发送过来的通知,二是 Driver 没有在规定时间内收到 Executor 的 StatusUpdate,于是 Driver 会将注册的 Executor 移除。

后果分析
Worker 异常退出会带来哪些影响:
1)Executor 退出导致提交的 Task 无法正常结束,会被再一次提交运行。
2)如果所有的 Worker 都异常退出,则整个集群不可用。
3)需要有相应的程序来重启 Worker 进程,比如使用 supervisord 或 runit。

测试步骤
1)启动 Master。
2)启动 Worker。
3)启动 spark-shell。
4)手工 kill 掉 Worker 进程。
5)用 jps 或 ps -ef | grep -i java 来查看启动着的 java 进程。

异常退出的代码处理
定义 ExecutorRunner.scala 的 start 函数

[mw_shl_code=scala,true]def start() {
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    override def run() { fetchAndRunExecutor() }
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = new Thread() {
    override def run() {
      killProcess(Some("Worker shutting down"))
    }
  }
  Runtime.getRuntime.addShutdownHook(shutdownHook)
}[/mw_shl_code]

killProcess 的过程就是停止相应 CoarseGrainedExecutorBackend 的过程。
Worker 停止的时候,一定要先将自己启动的 Executor 停止掉。这是不是很像水浒中宋江的手段,李逵就是这样不明不白的把命给丢了。

小结
需要特别指出的是,当 Worker 在启动 Executor 的时候,是通过 ExecutorRunner 来完成的,ExecutorRunner 是一个独立的线程,和 Executor 是一对一的关系,这很重要。Executor 作为一个独立的进程在运行,但会受到 ExecutorRunner 的严密监控。

14.6.2 异常分析2:Executor 异常退出
EtvRwq.png
后果分析
Executor 作为 Standalone 集群部署方式下的最底层员工,一旦异常退出,其后果会是什么呢?
1)Executor 异常退出,ExecutorRunner 注意到异常,将情况通过 ExecutorStateChanged 汇报给 Master。
2)Master 收到通知之后,非常不高兴,尽然有小弟要跑路,那还了得,要求 Executor 所属的 Worker 再次启动。
3)Worker 收到 LaunchExecutor 指令,再次启动 Executor。

测试步骤
1)启动 Master
2)启动 Worker
3)启动 spark-shell
4)手工 kill 掉 CoarseGrainedExecutorBackend

fetchAndRunExecutor
fetchAndRunExecutor 负责启动具体的 Executor,并监控其运行状态,具体代码逻辑如下所示

[mw_shl_code=scala,true]def fetchAndRunExecutor() {
  try {
    // Create the executor's working directory
    val executorDir = new File(workDir, appId + "/" + execId)
    if (!executorDir.mkdirs()) {
      throw new IOException("Failed to create directory " + executorDir)
    }

    // Launch the process
    val command = getCommandSeq
    logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
    val builder = new ProcessBuilder(command: _*).directory(executorDir)
    val env = builder.environment()
    for ((key, value)  {
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    }
    case e: Exception => {
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
    }
  }
}[/mw_shl_code]

14.6.3 异常分析3:Master 异常退出
Etv2mn.png
Worker 和 Executor 异常退出的场景都讲到了,我们剩下最后一种情况了,Master 挂掉了怎么办?

后果分析
带头大哥如果不在了,会是什么后果呢?
1)Worker 没有汇报的对象了,也就是如果 Executor 再次跑飞,Worker 是不会将 Executor 启动起来的,大哥没给指令。
2)无法向集群提交新的任务。
3)老的任务即便结束了,占用的资源也无法清除,因为资源清除的指令是 Master 发出的。

第15章 wordcount 程序运行原理窥探
15.1 spark 之 scala 实现 wordcount


在 spark 中使用 scala 来实现 wordcount(统计单词出现次数模型)更加简单,相对 java 代码上更加简洁,其函数式编程的思维逻辑也更加直观。

[mw_shl_code=scala,true]package com.spark.firstApp

import org.apache.spark.{SparkContext, SparkConf}

/**
  * scala 实现 wordcount
  */
object WordCount1 {
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.err.println("Usage: WordCount1 <file1>")
      System.exit(1)
    }
    /**
      * 1、实例化 SparkConf
      * 2、构建 SparkContext,SparkContext 是 spark 应用程序的唯一入口
      * 3. 通过 SparkContext 的 textFile 方法读取文本文件
      */
    val conf = new SparkConf().setAppName("WordCount1").setMaster("local")
    val sc = new SparkContext(conf)

    /**
      * 4、通过 flatMap 对文本中每一行的单词进行拆分(分割符号为空格),并利用 map 进行函数转换形成 (K,V) 对形式,再进行 reduceByKey,打印输出 10 个结果
      *    函数式编程更加直观的反映思维逻辑
      */
    sc.textFile(args(0)).flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).take(10).foreach(println)
    sc.stop()
  }
}[/mw_shl_code]

15.2 原理窥探

在 spark 集群中运行 wordcount 程序其主要业务逻辑比较简单,涵盖一下 3 个过程:
1)读取存储介质上的文本文件(一般存储在 hdfs 上);
2)对文本文件内容进行解析,按照单词进行分组统计汇总;
3)将过程 2 的分组结果保存到存储介质上。(一般存储在 hdfs 或者 RMDB 上)
虽然 wordcount 的业务逻辑非常简单,但其应用程序在 spark 中的运行过程却巧妙得体现了 spark 的核心精髓--分布式弹性数据集、内存迭代以及函数式编程等特点。下图对 spark 集群中 wordcount 的运行过程进行剖析,加深对 spark 技术原理窥探。
Etv4YT.png
该图横向分割下面给出了 wordcount 的 scala 核心程序实现,该程序在 spark 集群的运行过程涉及几个核心的 RDD,主要有 textFileRDD、flatMapRDD、mapToPairRDD、shuffleRDD(reduceByKey)等。
应用程序通过 textFile 方法读取 hdfs 上的文本文件,数据分片的形式以 RDD 为统一模式将数据加载到不同的物理节点上,如上图所示的节点 1、节点 2 到节点 n;并通过一系列的数据转换,如利用 flatMap 将文本文件中对应每行数据进行拆分(文本文件中单词以空格为分割符号),形成一个以每个单词为核心新的数据集合 RDD;之后通过 MapRDD 继续转换形成形成 (K,V) 对 数据形式,以便进一步使用 reduceByKey 方法,该方法会触发 shuffle 行为,促使不同的单词到对应的节点上进行汇聚统计(实际上在夸节点进行数据 shuffle 之前会在本地先对相同单词进行合并累加),形成 wordcount 的统计结果;最终通过 saveAsTextFile 方法将数据保存到 hdfs 上。具体的运行逻辑原理以及过程上图给出了详细的示意说明。

作者:黑泽君
来源:https://www.cnblogs.com/chenmingjun/p/10803261.html#commentform

最新经典文章,欢迎关注公众号


本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条