分享

Spark入门二:Spark1.2 RDD初步

问题导读

1.什么是spark RDD?
2.本文讲了RDD哪些特点?
3.有哪两种方式可以创建RDD?
4.RDD Transform基本方法有哪些?
5.RDD Action基本方法有哪些?








什么是RDD
Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是说,RDD是Spark框架的核心基石。RDD是一个可容错的数据集,这个数据集合中的数据是可以并行处理的。

RDD的特点:
  • A list of partitions 一系列的分片,比如说64M一片;类似于Hadoop中的split;
  • A function for computing each split   在每个分片上都有一个函数去迭代/执行/计算它
  • A list of dependencies on other RDDs  一系列的依赖:RDD a转换为RDD b,RDD b转换为RDD c,那么RDD c就依赖于RDD b,RDDb就依赖于RDDa
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)  对于key-value的RDD可指定一个partitioner,告诉它如何分片;常用的有hash,range
  • Optionally, a list of preferred location(s) to compute each split on (e.g. block locations for an HDFS file) 要运行的计算/执行最好在哪(几)个机器上运行。数据本地性。
前三个特点对应于Lineage,后两个对应于Optimized execution
对于如上的5个特点,对应于RDD中的5个方法
getPartitionsthe set of partitions in this RDD
computecompute a given partition
getDependenciesreturn how this RDD depends on parent RDDs
partitionerspecify how they are partitioned
getPreferredLocationsspecify placement preferences





HadoopRDDFiltered RDDJoinedRDD
partitionsHDFS上的block与父RDD一致一个partition一个任务
dependencies与父RDD 一对一依赖shuffle的每个父RDD
compute读取每个block的信息计算父RDD的每个分区并过滤读取shuffle数据
partitionerHDFS block所在位置HashPartitioner
preferredLocations无(与父RDD一致)


参考:http://www.cnblogs.com/luogankun/p/3801035.html

Spark ShellSpark Shell通过初始化一个SparkContext,然后通过Scala语言的脚本特性,可以以脚本的方式来学习Spark提供的API,通过这一点也可以看出Scala确实是有比Java方便简洁的特性。如下是Spark Shell的支持的命令参数


1.png

RDD(弹性分布式数据集,Resilent Distributed Dataset)Spark是围绕着RDD(Resilient Distributed Dataset,弹性分布式数据集)建立起来的,也就是说,RDD是Spark框架的核心基石。RDD是一个可容错的数据集,这个数据集合中的数据是可以并行处理的。有两种方式可以创建RDD:
1. 基于开发者提供的数据集来创建RDD
2. 基于引用外部存储系统中的数据集来创建RDD,比如共享文件系统,HDFS,HBase或者Hadoop InputFormat提供的任意数据源都可以用来创建RDD

并行数据集(程序数据)Spark可以通过SparkContext的parallelize方法实现对Scala程序提供的数据集创建RDD。 程序中的元素通过数据拷贝的形式创建一个RDD)。比如,下面的例子创建了一个RDD,它包含了1到5的5个数字,例如下面的Scala代码:

1. 创建ParallelCollectionRDD

2.png

2. 执行ParallelCollectionRDD的collect方法

3.png


3. 执行RDD的count方法


4.png


4. 执行RDD的saveAsTextFile("ParallelCollectionRDD")


  1. rdd.saveAsTextFile("<span style="font-size: 16px;">ParallelCollectionRDD</span>");
复制代码


执行结果是在HDFS的当前用户目录(/user/hadoop)下创建了一个ParallelCollectionRDD目录,并且有个part-00000作为存放文本的文件
  1. [hadoop@hadoop bin]$ hdfs dfs -cat /user/hadoop/ParallelCollectionRDD/part-00000
  2. 1
  3. 2
  4. 3
  5. 4
  6. 5
复制代码
外部数据集1.执行如下操作,则Spark创建了一个RDD(sparkData),而这个RDD的数据来源是HDFS(hdfs://hadoop.master:9000/user/hadoop/sparkdata.txt),也就是说,Spark默认是从当前用户(hadoop)的/user/hadoop下寻找sparkdata.txt文件。


  1. val sparkData = sc.textFile("sparkdata.txt");
复制代码
注:sparkdata.txt的内容如下:

  1. 1 2
  2. 3 4
  3. 5 6
  4. 7 8
  5. 9 10
复制代码
2. sparkData的类型
创建的RDD sparkData的详细信息是如下,可见sparkData是一个MappedRDD类型


  1. sparkData: org.apache.spark.rdd.RDD[String] = sparkdata.txt MappedRDD[4] at textFile at <console>:12
复制代码
RDD基本操作
1.统计sparkData RDD中有多少行
  1. sparkData.count()
复制代码
结果显示如下,得到的结果5,表示sparkData.txt有五行
  
5.png

2. 统计sparkData RDD中所有行的总长度

2.1 Map操作

  1. var lineLengths = sparkData.map(line=>line.length)
复制代码
执行的结果是: lineLengths是五个MappedDD

  1. lineLengths: org.apache.spark.rdd.RDD[Int] = MappedRDD[5] at map at <console>:14
复制代码
2.2 Reduce操作
  1. var total = lineLengthMap.reduce((a,b)=>a + b);
复制代码
执行的结果是:total是一个Int类型的数据。事实上,观察sparkdata.txt的数据,确实是所有行的总长度是16
  1. total: Int = 16
复制代码


3. Key/Pair RDD
3.1 执行如下操作

  1. lines = sc.textFile("sparkdata.txt")
复制代码
结果:

6.png

3.2 执行如下操作

  1. val pairs = lines.map(s => (s, 1))
复制代码
结果:

7.png



3.3 执行如下操作


  1. val counts = pairs.reduceByKey((a, b) => a + b)
复制代码
结果:

8.png


3.4 执行如下操作:

  1. counts.collect()
复制代码


结果:

9.png
10.png

RDD Transform基本方法

Transformation Meaning

map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so funcmust be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported throughleftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.


RDD Action基本方法
  Action Meaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
takeSample(withReplacement,num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)
(Java and Scala)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)
(Java and Scala)
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.










已有(2)人评论

跳转到指定楼层
feng01301218 发表于 2015-3-12 15:09:01
回复

使用道具 举报

ainubis 发表于 2015-3-27 16:22:47
学习啦(*^__^*) 嘻嘻……
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条