分享

spark2 sql读取数据源编程学习样例2:函数实现详解

本帖最后由 pig2 于 2017-12-15 18:12 编辑
问题导读

1.RDD转换为DataFrame需要导入哪个包?
2.Json格式的Dataset如何转换为DateFrame?
3.如何实现通过jdbc读取和保存数据到数据源?






spark2 sql读取数据源编程学习样例1
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484

runBasicParquetExample函数
这里接着上篇,继续阅读代码,下面我们看看runBasicParquetExample函数的功能实现
[mw_shl_code=scala,true] private def runBasicParquetExample(spark: SparkSession): Unit = {
    // $example on:basic_parquet_example$
    // Encoders for most common types are automatically provided by importing spark.implicits._
    import spark.implicits._

    val peopleDF = spark.read.json("examples/src/main/resources/people.json")

    // DataFrames can be saved as Parquet files, maintaining the schema information
    peopleDF.write.parquet("people.parquet")

    // Read in the parquet file created above
    // Parquet files are self-describing so the schema is preserved
    // The result of loading a Parquet file is also a DataFrame
    val parquetFileDF = spark.read.parquet("people.parquet")

    // Parquet files can also be used to create a temporary view and then used in SQL statements
    parquetFileDF.createOrReplaceTempView("parquetFile")
    val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
    namesDF.map(attributes => "Name: " + attributes(0)).show()
    // +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    // $example off:basic_parquet_example$
  }[/mw_shl_code]
这里面有一个包的导入
[mw_shl_code=scala,true]    import spark.implicits._
[/mw_shl_code]
Scala中与其它语言的区别是在对象,函数中可以导入包。这个包的作用是转换RDD为DataFrame。[mw_shl_code=scala,true]    val peopleDF = spark.read.json("examples/src/main/resources/people.json")
[/mw_shl_code]
上面自然是读取json文件。
peopleDF.write.parquet("people.parquet")

这里同样是保存文件,不过people.parquet是文件夹。文件夹里面是数据,其中有*00000*为数据文件。
[mw_shl_code=scala,true]val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")[/mw_shl_code]
这里调用sql语句。
[mw_shl_code=scala,true]    namesDF.map(attributes => "Name: " + attributes(0)).show()
[/mw_shl_code]这里通过map映射,增加Name:
[mw_shl_code=scala,true]// +------------+
    // |       value|
    // +------------+
    // |Name: Justin|
    // +------------+
    // $example off:basic_parquet_example$[/mw_shl_code]



runParquetSchemaMergingExample函数
[mw_shl_code=scala,true]private def runParquetSchemaMergingExample(spark: SparkSession): Unit = {
    // $example on:schema_merging$
    // This is used to implicitly convert an RDD to a DataFrame.
    import spark.implicits._

    // Create a simple DataFrame, store into a partition directory
    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")

    // Create another DataFrame in a new partition directory,
    // adding a new column and dropping an existing column
    val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")

    // Read the partitioned table
    val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()

    // The final schema consists of all 3 columns in the Parquet files together
    // with the partitioning column appeared in the partition directory paths
    // root
    //  |-- value: int (nullable = true)
    //  |-- square: int (nullable = true)
    //  |-- cube: int (nullable = true)
    //  |-- key: int (nullable = true)
    // $example off:schema_merging$
  }[/mw_shl_code]
[mw_shl_code=scala,true]    val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
    squaresDF.write.parquet("data/test_table/key=1")[/mw_shl_code]
上面是创建一个RDD,然后通过toDF转换为DataFrame。然后保存到分区目录下。
[mw_shl_code=scala,true]  val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
    cubesDF.write.parquet("data/test_table/key=2")[/mw_shl_code]
创建另外一个DataFrame,并且添加一个新列,删除现有列[mw_shl_code=scala,true] val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
    mergedDF.printSchema()[/mw_shl_code]
上面自然是读取数据保存为DataFrame,option("mergeSchema", "true"), 默认值由spark.sql.parquet.mergeSchema指定。设置所有的分区文件是否合并Schema。设置后将覆盖spark.sql.parquet.mergeSchema指定值。

runJsonDatasetExample函数
[mw_shl_code=scala,true]private def runJsonDatasetExample(spark: SparkSession): Unit = {
    // $example on:json_dataset$
    // Primitive types (Int, String, etc) and Product types (case classes) encoders are
    // supported by importing this when creating a Dataset.
    import spark.implicits._

    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files
    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)

    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)

    // Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+

    // Alternatively, a DataFrame can be created for a JSON dataset represented by
    // a Dataset[String] storing one JSON object per string
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
    // $example off:json_dataset$
  }[/mw_shl_code]上面有些代码重复了,就不在解释了。
[mw_shl_code=scala,true]val path = "examples/src/main/resources/people.json"    val peopleDF = spark.read.json(path)

    // The inferred schema can be visualized using the printSchema() method
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)
[/mw_shl_code]
上面是读取json,并显示schema。
[mw_shl_code=scala,true]// Creates a temporary view using the DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL statements can be run by using the sql methods provided by spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()[/mw_shl_code]
peopleDF.createOrReplaceTempView("people")是DataFrame注册为people表
[mw_shl_code=scala,true]teenagerNamesDF.show()[/mw_shl_code]自然是显示数据。
如下
[mw_shl_code=scala,true]  // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+[/mw_shl_code]
[mw_shl_code=scala,true]val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)[/mw_shl_code]
这里创建一个json格式的dataset
[mw_shl_code=scala,true]    val otherPeople = spark.read.json(otherPeopleDataset)
[/mw_shl_code]
这行代码,是读取上面创建的dataset,然后创建DataFrame。从上面我们看出这也是dataset和DataFrame转换的一种方式。

runJdbcDatasetExample函数
[mw_shl_code=scala,true]private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    // $example on:jdbc_dataset$
    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()

    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // $example off:jdbc_dataset$
  }
}[/mw_shl_code]
这个是运行Jdbc Dataset的例子。那么如何从jdbc读取数据,是通过下面各个option
[mw_shl_code=scala,true] .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")[/mw_shl_code]
第一行server也就是服务器地址
第二行是表名
第三行是用户名
第四行为密码,相信大家也能看明白。[mw_shl_code=scala,true]    val connectionProperties = new Properties()
[/mw_shl_code]
Properties这个是用来做什么的那?
我们来看官网
conn.jpg

它是
JDBC database 连接的一个参数,是一个字符串tag/value的列表。于是有了下面内容
[mw_shl_code=scala,true] connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")[/mw_shl_code]
我们看到上面放入了用户名和密码
[mw_shl_code=scala,true]val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)[/mw_shl_code]
这里设置了连接url,表名,还有connectionProperties
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
上面是指定读取Schema的自定义数据类型。
[mw_shl_code=scala,true]
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
   jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

[/mw_shl_code]
上面分别都是将数据通过jdbc保存到数据库

相关文章:
spark2 sql读取json文件的格式要求
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23478


spark2 sql读取json文件的格式要求续:如何查询数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23483

spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518

spark2 sql编程样例:sql操作
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23501

spark2 sql读取数据源编程学习样例1:程序入口、功能等知识详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23484


spark2 sql读取数据源编程学习样例2:函数实现详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23489


spark2 sql编程之实现合并Parquet格式的DataFrame的schema
http://www.aboutyun.com/forum.php?mod=viewthread&tid=23518





本帖被以下淘专辑推荐:

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条