分享

SparkSQL入门

xuanxufeng 2015-11-30 20:21:45 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 6 13966
问题导读

1.本文认为SQLContext的作用是什么?
2.DataFrames对外DSL接口有哪些?
3.如何运行sql?





Base on spark 1.5.1 overview
一、入口:


[mw_shl_code=bash,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._[/mw_shl_code]

SQLContext默认解析器为"sql",用于解析sql语句。
除了SQLContext之外,还有HiveContext,用了更加完善的解析器,默认解析器为spark.sql.dialect="hiveql"

二、创建DataFrames
目前支持从RDD、hive表以及其它数据源中创建DataFrames。详见下一篇介绍
[mw_shl_code=bash,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()[/mw_shl_code]


三、DataFrames对外DSL接口
1、show:打印
2、printSchema:打印schema信息
3、select: 从原始DataFrames中选择部分colums
4、filter:过滤
5、groupBy:分组
6、count:计数
...

四、运行sql

[mw_shl_code=bash,true]val sqlContext = ...  // An existing SQLContext
val df = sqlContext.sql("SELECT * FROM table")[/mw_shl_code]


五、Schema推断
1、从已知格式中反射出对应的schema信息,使用case classes
[mw_shl_code=bash,true]// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)[/mw_shl_code]

2、不使用case classes

[mw_shl_code=bash,true]// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Import Row.
import org.apache.spark.sql.Row;

// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)[/mw_shl_code]

六、数据源
1、从parquet格式的文件中加载/输出

[mw_shl_code=bash,true]val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")[/mw_shl_code]

2、更方便的格式化,而无需像以上一样去解析
[mw_shl_code=bash,true]val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")[/mw_shl_code]

3、Hive表
支持Hive的spark需要 -Phive 和 -Phive-thriftserver 编译spark,且需要将lib_managed/jars下的datanucleus相关jar包以及hive-site.xml 放在指定的位置。
详见spark 1.5.1 overview: http://spark.apache.org/docs/latest/sql-programming-guide.html
[mw_shl_code=bash,true]// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)[/mw_shl_code]

4、jdbc
需首先加载相应的jdbc驱动到spark classpath
每个工作节点上也需要能加载驱动包,可以将驱动jars放在每个节点的classpath中。




已有(6)人评论

跳转到指定楼层
pig 发表于 2015-12-1 09:29:58
东西不错,赞一个
回复

使用道具 举报

lmlm1234 发表于 2015-12-1 09:38:59
从parquet格式的文件中加载/输出
回复

使用道具 举报

QIDOUDOU 发表于 2015-12-1 10:14:34
东西不错啊,学习了
回复

使用道具 举报

月如钩 发表于 2015-12-1 15:51:38
收录 学习 谢谢楼主
回复

使用道具 举报

lmlm1234 发表于 2015-12-2 09:48:43
DataFrames对外DSL接口
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条