分享

有关Spark核心RDD的内幕真相

问题导读

1.RDD之间的依赖关系可以分为哪两类?
2.spark rdd有哪两种容错方式?
3.Spark中的RDD两种类型的操作,包括转换(Transformation)和动作(Action),转换得到什么内容,动作得到的是什么内容?



什么是RDD?

RDD,弹性分布式数据集,全称Resilient Distributed Datasets,是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。

RDD作为Spark的核心内容,在Spark的云寻觅文档中解释如下:RDD is a fault-tolerant collection of elements that can be operated on in parallel。由此可见,其中有两个关键词:fault-tolerant & in parallel。首先,容错性是RDD的一个重要特性;其次,它是并行计算的数据。

RDD具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等 。

RDD可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作,以支持常见的数据运算。

通常来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm则采用了Stream Processing模型。RDD混合了这四种模型,使得Spark可以应用于各种大数据处理场景。

RDD之间的依赖关系

RDD之间的依赖关系可以分为两类,即:

  • 窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);

  • 宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。


另一个例子见下图:

1.jpg

上图是窄依赖和宽依赖的例子。(方框表示RDD,实心矩形表示分区)

区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

RDD对容错的支持

支持容错通常采用两种方式:数据复制或日志记录。对于以数据为中心的系统而言,这两种方式都非常昂贵,因为它需要跨集群网络拷贝大量数据,毕竟带宽的数据远远低于内存。

RDD天生是支持容错的。首先,它自身是一个不变的(immutable)数据集,其次,它能够记住构建它的操作图(Graph of Operation),因此当执行任务的Worker失败时,完全可以通过操作图获得之前执行的操作,进行重新计算。由于无需采用replication方式支持容错,很好地降低了跨网络的数据传输成本。

不过,在某些场景下,Spark也需要利用记录日志的方式来支持容错。例如,在Spark Streaming中,针对数据进行update操作,或者调用Streaming提供的window操作时,就需要恢复执行过程的中间状态。此时,需要通过Spark提供的checkpoint机制,以支持操作能够从checkpoint得到恢复。
针对RDD的wide dependency,最有效的容错方式同样还是采用checkpoint机制。不过,似乎Spark的最新版本仍然没有引入auto checkpointing机制。

Spark中的RDD操作

hadoop提供的接口只有map和reduce函数,spark是mapreduce的扩展,提供两类操作,而不是两个,使使用更方便,开发时的代码量会尽量的被spark的这种多样的API减少数十倍。

下图是Spark中的RDD两种类型的操作,包括转换(Transformation)和动作(Action)。
1.transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD
2.action是得到一个值,或者一个结果(直接将RDD cache到内存中)
所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

2.jpg

RDD编程示例

在Spark中,RDD被表示为对象,通过这些对象上的方法(或函数)调用转换。

定义RDD之后,程序员就可以在动作中使用RDD了。动作是向应用程序返回值,或向存储系统导出数据的那些操作,例如,count(返回RDD中的元素个数),collect(返回元素本身),save(将RDD输出到存储系统)。在Spark中,只有在动作第一次使用RDD时,才会计算RDD(即延迟计算)。这样在构建RDD的时候,运行时通过管道的方式传输多个转换。

程序员还可以从两个方面控制RDD,即缓存和分区。用户可以请求将RDD缓存,这样运行时将已经计算好的RDD分区存储起来,以加速后期的重用。缓存的RDD一般存储在内存中,但如果内存不够,可以写到磁盘上。

另一方面,RDD还允许用户根据关键字(key)指定分区顺序,这是一个可选的功能。目前支持哈希分区和范围分区。例如,应用程序请求将两个RDD按照同样的哈希分区方式进行分区(将同一机器上具有相同关键字的记录放在一个分区),以加速它们之间的join操作。在Pregel和HaLoop中,多次迭代之间采用一致性的分区置换策略进行优化,我们同样也允许用户指定这种优化。

示例:控制台日志挖掘:

本部分我们通过一个具体示例来阐述RDD。假定有一个大型网站出错,操作员想要检查Hadoop文件系统(HDFS)中的日志文件(TB级大小)来找出原因。通过使用Spark,操作员只需将日志中的错误信息装载到一组节点的内存中,然后执行交互式查询。首先,需要在Spark解释器中输入如下Scala命令:

  1. lines = spark.textFile("hdfs://...")
  2. errors = lines.filter(_.startsWith("ERROR"))
  3. errors.cache()
复制代码


第1行从HDFS文件定义了一个RDD(即一个文本行集合),第2行获得一个过滤后的RDD,第3行请求将errors缓存起来。注意在Scala语法中filter的参数是一个闭包。

这时集群还没有开始执行任何任务。但是,用户已经可以在这个RDD上执行对应的动作,例如统计错误消息的数目:
  1. errors.count()
复制代码

       
       
用户还可以在RDD上执行更多的转换操作,并使用转换结果,如:

  1. // Count errors mentioning MySQL:
  2.         errors.filter(_.contains("MySQL")).count()
  3.         // Return the time fields of errors mentioning
  4.         // HDFS as an array (assuming time is field
  5.         // number 3 in a tab-separated format):
  6.         errors.filter(_.contains("HDFS"))
  7.          .map(_.split('\t')(3))
  8.         .collect()
复制代码


使用errors的第一个action运行以后,Spark会把errors的分区缓存在内存中,极大地加快了后续计算速度。注意,最初的RDD lines不会被缓存。因为错误信息可能只占原数据集的很小一部分(小到足以放入内存)。

最后,为了说明模型的容错性,下图给出了第3个查询的Lineage图。在lines RDD上执行filter操作,得到errors,然后再filter、map后得到新的RDD,在这个RDD上执行collect操作。Spark调度器以流水线的方式执行后两个转换,向拥有errors分区缓存的节点发送一组任务。此外,如果某个errors分区丢失,Spark只在相应的lines分区上执行filter操作来重建该errors分区。

3.jpg

上图是示例中第三个查询的Lineage图。(方框表示RDD,箭头表示转换)

总结

RDD是Spark的核心,也是整个Spark的架构基础。它的特性可以总结如下:
  • 它是不变的数据结构存储
  • 它是支持跨集群的分布式数据结构
  • 可以根据数据记录的key对结构进行分区
  • 提供了粗粒度的操作,且这些操作都支持分区
  • 它将数据存储在内存中,从而提供了低延迟性


已有(3)人评论

跳转到指定楼层
sprite101 发表于 2015-5-12 19:42:29
回复

使用道具 举报

xiaolv168 发表于 2015-11-25 15:10:53
多谢分享,受益很多
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条