分享

Spark SQL之External DataSource外部数据源(一)示例

gefieder 2015-4-20 21:48:53 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 24354
本帖最后由 nettman 于 2015-4-20 23:58 编辑
问题导读:
1、创建外部数据源表的语句是?
2、DDL如何创建外部数据源表jsonTable?
3、使用Spark SQL如何执行查询?




一、Spark SQL External DataSource简介  
随着Spark1.2的发布,Spark SQL开始正式支持外部数据源。Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现。
  这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式。只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL。之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成。
(Ps: 关于External Datasource源码解析篇请移步至:Spark SQL之External DataSource外部数据源(二)源码分析)


二、External DataSource  
拿Spark1.2的json为例,它支持已经改为了实现了外部数据源的接口方式。所以除了先前我们操作json的API,又多了一种DDL创建外部数据源的方式。
  parquetFile的操作方式也如下类似,就不一一列举了。

2.1 SQL方式 CREATE TEMPORARY TABLE USING OPTIONS
在Spark1.2之后,支持了一种CREATE TEMPORARY TABLE USING OPTIONS的DDL语法来创建外部数据源的表。
  1. CREATE TEMPORARY TABLE jsonTable
  2. USING org.apache.spark.sql.json
  3. OPTIONS (
  4.   path '/path/to/data.json'
  5. )
复制代码

1、操作示例:
我们拿example下people.json文件来做示例。
  1. shengli-mac$ cat /Users/shengli/git_repos/spark/examples/src/main/resources/people.json
  2. {"name":"Michael"}
  3. {"name":"Andy", "age":30}
  4. {"name":"Justin", "age":19}
复制代码

2、DDL创建外部数据源表jsonTable:
  1. 14/12/21 16:32:14 INFO repl.SparkILoop: Created spark context..
  2. Spark context available as sc.
  3. scala> import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.SQLContext
  5. scala> val sqlContext  = new SQLContext(sc)
  6. sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@7be62956
  7. scala> import sqlContext._
  8. import sqlContext._
  9. //创建jsonTable外部数据源表,并且指定其数数据源文件是people.json这个json文件,同时指定使用org.apache.spark.sql.json该类型的隐式转化类(这个后续文章会介绍)
  10. scala> val jsonDDL = s"""
  11.      | |CREATE TEMPORARY TABLE jsonTable
  12.      | |USING org.apache.spark.sql.json
  13.      | |OPTIONS (
  14.      | | path  'file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json'
  15.      | |)""".stripMargin
  16. jsonDDL: String =
  17. "
  18. CREATE TEMPORARY TABLE jsonTable
  19. USING org.apache.spark.sql.json
  20. OPTIONS (
  21. path  'file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json'
  22. )"
  23. scala> sqlContext.sql(jsonDDL).collect() //创建该外部数据源表jsonTable
  24. 14/12/21 16:44:27 INFO scheduler.DAGScheduler: Job 0 finished: reduce at JsonRDD.scala:57, took 0.204461 s
  25. res0: Array[org.apache.spark.sql.Row] = Array()
复制代码

我们来看下该schemaRDD:
  1. scala> val jsonSchema = sqlContext.sql(jsonDDL)
  2. jsonSchema: org.apache.spark.sql.SchemaRDD =
  3. SchemaRDD[7] at RDD at SchemaRDD.scala:108
  4. == Query Plan ==
  5. == Physical Plan ==
  6. ExecutedCommand (CreateTableUsing jsonTable, org.apache.spark.sql.json, Map(path -> file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json))
复制代码

ExecutedCommand来取把数据用spark.sql.json的方式从path加载到jsonTable中。涉及到得类是CreateTableUsing,后续源码分析会讲到。
各阶段执行计划情况:
  1. scala> sqlContext.sql("select * from jsonTable").queryExecution
  2. res6: org.apache.spark.sql.SQLContext#QueryExecution =
  3. == Parsed Logical Plan ==
  4. 'Project [*]
  5. 'UnresolvedRelation None, jsonTable, None
  6. == Analyzed Logical Plan ==
  7. Project [age#0,name#1]
  8. Relation[age#0,name#1] JSONRelation(file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json,1.0)
  9. == Optimized Logical Plan ==
  10. Relation[age#0,name#1] JSONRelation(file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json,1.0)
  11. == Physical Plan ==
  12. PhysicalRDD [age#0,name#1], MapPartitionsRDD[27] at map at JsonRDD.scala:47
  13. Code Generation: false
  14. == RDD ==
复制代码
至此,创建加载外部数据源到Spark SQL已经完成。
我们可以使用任何我们希望的方式来查询:
3、SQL查询方式:
  1. scala> sqlContext.sql("select * from jsonTable")
  2. 21 16:52:13 INFO spark.SparkContext: Created broadcast 6 from textFile at JSONRelation.scala:39
  3. res2: org.apache.spark.sql.SchemaRDD =
  4. SchemaRDD[20] at RDD at SchemaRDD.scala:108
  5. == Query Plan ==
  6. == Physical Plan ==
  7. PhysicalRDD [age#2,name#3], MapPartitionsRDD[24] at map at JsonRDD.scala:47
复制代码
执行查询:
  1. scala> sqlContext.sql("select * from jsonTable").collect()
  2. res1: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
复制代码

2.2 API方式
sqlContext.jsonFile
  1. scala> val json = sqlContext.jsonFile("file:///Users/shengli/git_repos/spark/examples/src/main/resources/people.json")
  2. scala> json.registerTempTable("jsonFile")
  3. scala> sql("select * from jsonFile").collect()
  4. res2: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
复制代码

三、总结  总的来说,Spark SQL 在努力的向各种数据源靠拢,希望让Spark SQL能和其它许多类型的数据源的集成。
  Spark SQL提供的了一种创建加载外部数据源表的DDL语法:CREATE TEMPORARY TABLE USING OPTIONS
  Spark SQL对外开放了一系列的扩展接口,能够通过实现这些接口,来实现对不同的数据源接入,如avro, csv, parquet,json, etc......

资料来源:http://blog.csdn.net/oopsoom/article/details/42061077


欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条