分享

Spark 2.0中Dataset介绍和使用

本帖最后由 xuanxufeng 于 2016-6-9 14:21 编辑
问题导读

1.什么是dataset?
2.本文认为DataSet和RDD主要的区别是是什么?
3.Dataset Wordcount实例本文用了几步?





Dataset介绍


  Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本;然而在Spark 2.0,它已经变成了稳定版了。下面是DataSet的官方定义:
[mw_shl_code=bash,true]A Dataset is a strongly typed collection of domain-specific objects that can be transformed
in parallel using functional or relational operations. Each Dataset also has an untyped view
called a DataFrame, which is a Dataset of Row.[/mw_shl_code]


Dataset是特定域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。每个Dataset都有一个称为DataFrame的非类型化的视图,这个视图是行的数据集。上面的定义看起来和RDD的定义类似,RDD的定义如下:
[mw_shl_code=bash,true]RDD represents an immutable,partitioned collection of elements that can be operated on in parallel
[/mw_shl_code]

RDD也是可以并行化的操作,DataSet和RDD主要的区别是:DataSet是特定域的对象集合;然而RDD是任何对象的集合。DataSet的API总是强类型的;而且可以利用这些模式进行优化,然而RDD却不行。

  Dataset的定义中还提到了DataFrame,DataFrame是特殊的Dataset,它在编译时不会对模式进行检测。在未来版本的Spark,Dataset将会替代RDD成为我们开发编程使用的API(注意,RDD并不是会被取消,而是会作为底层的API提供给用户使用)。

上面简单地介绍了Dataset相关的定义,下面让我们来看看如何以编程的角度来使用它。


Dataset Wordcount实例


为了简单起见,我将介绍如何使用DataSet编写WordCount计算程序。

第一步、创建SparkSession


我们在这里将使用SparkSession作为程序的切入点,并使用它来创建出Dataset:
[mw_shl_code=scala,true]val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()[/mw_shl_code]


第二步、读取数据并将它转换成Dataset

我们可以使用read.text API来读取数据,正如RDD版提供的textFile,as[String]可以为dataset提供相关的模式,如下:

[mw_shl_code=bash,true]import sparkSession.implicits._
val data = sparkSession.read.text("src/main/resources/data.txt").as[String][/mw_shl_code]

上面data对象的类型是DataSet[String],我们需要引入sparkSession.implicits._。

第三步、分割单词并且对单词进行分组

Dataset提供的API和RDD提供的非常类似,所以我们也可以在DataSet对象上使用map, groupByKey相关的API,如下:

[mw_shl_code=scala,true]val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toLowerCase)[/mw_shl_code]

第三步、分割单词并且对单词进行分组

Dataset提供的API和RDD提供的非常类似,所以我们也可以在DataSet对象上使用map, groupByKey相关的API,如下:

[mw_shl_code=scala,true]val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toLowerCase)[/mw_shl_code]

有得同学可能注意到,我们并没有创建出一个key/value键值对,因为DataSet是工作在行级别的抽象,每个值将被看作是带有多列的行数据,而且每个值都可以看作是group的key,正如关系型数据库的group。

第四步、计数
一旦我们有了分组好的数据,我们可以使用count方法对每个单词进行计数,正如在RDD上使用reduceByKey:

[mw_shl_code=bash,true]val counts = groupedWords.count()
[/mw_shl_code]

第五步、打印结果
  正如RDD一样,上面的操作都是懒执行的,所以我们需要调用action操作来触发上面的计算。在dataset API中,show函数就是action操作,它会输出前20个结果;如果你需要全部的结果,你可以使用collect操作:

[mw_shl_code=bash,true]counts.show()
[/mw_shl_code]

完整的代码

[mw_shl_code=scala,true]package com.iteblog.spark

import org.apache.spark.sql.SparkSession

/**
  * Created by http://www.iteblog.com
  */
object DataSetWordCount {

  def main(args: Array[String]) {

    val sparkSession = SparkSession.builder.
      master("local")
      .appName("example")
      .getOrCreate()

    import sparkSession.implicits._
    val data = sparkSession.read.text("src/main/resources/data.txt").as[String]

    val words = data.flatMap(value => value.split("\\s+"))

    val groupedWords = words.groupByKey(_.toLowerCase)

    val counts = groupedWords.count()

    counts.show()

  }

}[/mw_shl_code]









没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条