分享

探索Spark源码---RDD模型

pig 发表于 2016-1-4 15:49:35 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 8919
RDD(Resilient Distributed Datasets) ,弹性分布式数据集, 是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。在这个章节中我们我们还以第一小节中给出的sparkPi示例程序来进行讲解。在讲解RDD之前,我们先逐一介绍Spark中的重要概念:
l  RDD:(Resilient Distributed DataSets)弹性分布式数据集
l 创建操作:(creation operation)RDD的初始创建工作都是由SparkContext来完成的,可以使用基于Scala的集合或者是外部的数据源。
l 转换操作:将一个RDD经过操作转换成另外一个RDD,比如ParallelCollectionRDD经过map操作转换成MapPartitionsRDD,map,filter,flatmap等都是转换操作。
l 控制操作:(control operation)对RDD进行持久化操作。可以让RDD保存在磁盘或则内存中如内存文件系统Tachyon,以便后续重复使用。
l 行动操作:(action operation) 对RDD进行行动操作将会触发作业的提交和运行,并产生最后的结果。
13.2.1 RDD初探
RDD是一个能被分区的只读数据集,生成RDD有两种途径,一种是通过SparkContext利用Scala原生集合或者外部数据源生成,另外一种是通过RDD转换得到。RDD之间的转换操作被记录下来,来构建形成RDD的继承关系(lineage),通过lineage可以有效的进行容错处理,而不需要将实际的RDD数据进行拷贝。因此在Spark中我们使用到的每一个RDD,在丢失或者操作失败都可以重建。抽象的RDD接口包括五个主要的属性:
Partition
分区,一个RDD有至少一个分区
preferredLocation(p)
对于分区p,返回计算的本地化节点
Dependencies()
RDD依赖
Compute(p,context)
对于分区p的计算方法
Partitioner()
RDD分区函数
那么在RDD中,继承关系是如何体现的呢?在我们的sparkPi示例程序中,通过sparkContext创建RDD: sparkContext.parallelize(1 until n, slices),那就让我们进入到parallelize这个方法,在parallelize方法中有如下代码:
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
在这个地方实例化了一个ParallelCollectionRdd,它实现了RDD接口并重写了RDD接口中的方法。PrallelCollectionRdd定义如下:
private[spark] class ParallelCollectionRDD[T: ClassTag](
    @transient sc: SparkContext,
    @transient data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil)
在上面代码中我们注意到extends RDD[T](sc,Nil),将sc和Nil传入RDD抽象类的主构造其中,RDD主构造器代码如下:
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
第二个参数表示RDD的依赖,PrallelCollectionRdd没有父依赖,所以传入Nil对象,表示一个空的列表。
再来看我们的map反方法,map方法将PrallelCollectionRdd转换成MapPatitionRdd,在map方法中有如下代码:
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
我们可以看到,使用new关键字实例化出来一个MapPartitionRdd,让我们跟进MapPartitionRDD的构造方法:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev)
在上面的代码中,我们可以看到extends RDD[U](prev)这样的代码,这句代码调用了RDD的辅助构造函数,进入RDD的这个辅助构造函数:
def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))
在这个方法里面,我们看到构建了一个OneToOneDependency依赖,将oneParent传入了新构建的MapPartitionRdd,这里的oneParent即是上诉代码中new MapPartitionsRdd[U,T](this,(context,pid,iter)=>iter.map(cleanF))中传入的this,而该this即是PrallelCollectionRdd,因此新建的MapPartitionRdd中得到了PrallelCollectionRdd的引用,即建立了RDD转换的之间的世代关系(lineage)。
至此,我们回答了上面的问题,通过spark提供的RDD转换操作,会将转换之前的RDD传入转换之后的RDD,以形成RDD之间转换的世代关系。
上面以跟踪代码的方式,走读了RDD lineage的创建,接下来我们将分析一下RDD中的五个常用的属性或者说方法。
Partitions:
在RDD接口中我们可以看到partitions这个属性,对于一个RDD而言,分区数的多少涉及对这个RDD进行并行计算的粒度,每一个RDD分区的计算操作都在一个独立的任务中进行。RDD分区数量可以指定,如果没有指定分区数量,程序将会使用默认分区数。系统默认的数值是程序所分配的资源的CPU核的个数。我们通过RDD的partitions的size方法可以得到当前RDD分区的数量,如:rdd.partitions.size。RDD接口中partitions代码如下:
final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
      }
      partitions_
    }
  }
从代码中我们可以看到partitions是一个数组,数组里面存放的是Partition对象。由RDD的子类实现的getPartitions方法,返回分区。
preferredLocations(p):
RDD位置属性与Spark的调度相关,返回的是p分区所存储的位置,按照“移动数据不如移动代码”的理念,在Spark任务调度的时候尽可能将数据块分配到数据块所存储的位置。preferredLocations返回每一个数据块所在的机器名称或则IP地址,比如在HDFS中,数据块是冗余存放的,则preferredLocations返回这个数据块(分区)所在机器地址的一个数组。调度程序根据这些返回的地址信息,更加有效的进行任务调度,最大可能保证数据计算的本地化,减少网络传输。
Dependencies:
   在RDD的转换操作中,每一次转换操作都会生成新的RDD,因此RDD之间就会形成类似与流水线的依赖关系。这种依赖关系有两种:窄依赖(Narrow Dependencies)和宽依赖(wide dependencies).
窄依赖:每一个父RDD最多只被子RDD的一个分区使用(一对一关系)。
宽依赖:多个子RDD的分区会依赖同一个父RDD的分区(一对多关系)。  
file:////tmp/wps-yun/ksohtml/wpsrBuDDq.jpg
那么在设计RDD依赖的时候,为什么考虑到有宽窄依赖呢?原因有两:第一,窄依赖可以在一个节点上如流水一般的执行,可以计算所有父RDD的分区,而宽依赖需要取得父RDD所有分区上的数据进行计算,将会执行shuffle操作,而shuffle操作是DAGSchedular划分stage的依据。第二,对于窄依赖来说节点计算失败后的恢复更加有效,只需要重新计算父RDD对应的分区,而且可以在其他节点上并行的进行,而对于宽依赖一个节点的失败将导致其父RDD的多个分区重新计算,而这个计算代价是非常高昂的。
Compute(p,context):
RDD的计算都是基于partition为单位的,RDD中的compute函数都是对迭代器的复合,不需要保存每次计算的结果。Compute函数会在action操作时被触发。Compute(p,context)方法返回一个迭代器,当调用迭代器的方法时,将最终计算出结果。
Partitioner():分区函数
     Partitioner是RDD中的分区函数,目前在Spark中实现了两种分区函数:HashPartitioner(哈希分区)和RangePatitioner(区域分区),并且partitioner只存在(K,V)类型的RDD中,对于非(K,V)类型的RDD partitioner的值是None。Partitioner函数既决定了RDD本身的分区数量,也可以作为父RDD输出中每个分区进行数据切割的依据。
    至此我们对RDD五大属性(方法)介绍告一段落。我们再次对RDD进行总结:RDD弹性分布式数据集(Resilient Distributed Datasets),它具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型 。
    RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join(转换不是开发人员在RDD上执行的操作) 。
    RDD不需要物化。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区。
RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个 dataset片段。RDD可以相互依赖。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条