分享

Spark Sql介绍及实际遇到的问题

本帖最后由 levycui 于 2016-9-13 18:01 编辑

问题导读:
1、什么是DataFrames?
2、什么是DataFrames Schema?
3、Spark 1.2 Sql踩过的坑有哪些?




在spark sql 之前,spark团队是以兼容hive为目标而创建了shark,但是由于完全使用hive的东西,导致执行计划无法方便的做优化,并且hive的源码有一些线程安全的问题,所以spark团队不得不放弃的了shark。

spark团队在经历了shark的惨痛教训后,痛下决心,自行实现了名为spark sql 的执行引擎,其仅仅在hive方面hql parser(hive的语法与解析)、Hive Metastore和Hive SerDe,并由Catalyst负责执行计划生成和优化。

Spark Sql 的定位无疑是切合广大开发者,sql语法减少了学习成本,其的优化能力简化了开发者性能调优要求,让开发者不再每句代码都得小心翼翼的。

对hive的支持都移动到了 Hive on Spark中。
2016-09-13_141932.jpg
个人的观点来说,如果要在spark 上使用sql的方式的话,推荐以spark sql为主,对hive的支持与hive的发展总是有一定差距的,并且 spark 团队可能会调整Hive的支持代码.

spark sql 代码使用入口依然符合spark的习惯 - context的声明:
[mw_shl_code=applescript,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)[/mw_shl_code]

如果要使用hive的支持,入口只需换为 HiveContext,而不使用SQLContext就行.

DataFrames 介绍
DataFrames 官方解释其为一个由多个有含义和名字的列组织数据的分布式集合, 其的表现形式比较类同与关系型数据库的表概念.

一般来说,都是优先使用DataFrames而不直接使用RDD,因为DataFrames会优化你的执行计划,而RDD则是忠实的按照你代码生成执行计划,并且spark sql 中提供很多方法便利地从多种数据源生成DataFrames.

如下即为从json文件中生成DataFrames以及一些DataFrames的基本使用:
[mw_shl_code=applescript,true]val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 创建DataFrames
val df = sqlContext.read.json("examples/src/main/resources/people.json")

// 显示数据内容
df.show()
// age  name
// null Michael
// 30   Andy
// 19   Justin

// 打印DataFrames的结构
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// 获取name列的数据
df.select("name").show()
// name
// Michael
// Andy
// Justin

// 获取name和age列的数据,但是age的每个值都+1
df.select(df("name"), df("age") + 1).show()
// name    (age + 1)
// Michael null
// Andy    31
// Justin  20

// 获取age大于21的数据
df.filter(df("age") > 21).show()
// age name
// 30  Andy

// 查询age的分组计数情况
df.groupBy("age").count().show()
// age  count
// null 1
// 19   1
// 30   1
[/mw_shl_code]

spark sql也支持隐式将RDD转化为DataFrame,只需加入如下代码:

[mw_shl_code=applescript,true]// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._[/mw_shl_code]

DataFrames Schema 介绍
DataFrame 可以以两种不同的方式从RDD转化而来,其实准确来说,转化只有一种方式,只是如何识别RDD,如何对应DataFrame的Schema信息有两种方式。
通过现有的class获取schema信息

例如如下代码:
[mw_shl_code=applescript,true]val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()[/mw_shl_code]

但是case calss 在scala 2.10中字段数量是有限的,只能22个,为了规避这个问题,你可以通过实现scala的Product接口:
通过构建具体的schema信息,比如设定字段名、类型等

例如如下代码:
[mw_shl_code=applescript,true]val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

val schema =
  StructType(
    "name age".split(" ").map(fieldName => StructField(fieldName, StringType, true)))

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)[/mw_shl_code]

Spark Sql Demo
我们的demo是简单的用sql的方式从csv文件中筛选数据。
[mw_shl_code=applescript,true]package Demo

import org.apache.spark._
import org.apache.spark.sql._

import scala.collection.mutable

object SqlDemo {
  def main(argment: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("test").setMaster("local") //本地调试运行
    val sc = new SparkContext(conf)  // 建立spark操作上下文
    val sqlContext = new SQLContext(sc) // 建立spark sql 操作上下文

    val rowRDD = sc.textFile("xxxPath\\people.csv")   //读取 csv 文件
      .map(line => {                                  // 转换成 row 结构
        val data = line.split(";")                    // 切分字段
        val list = mutable.ArrayBuffer[Any]()
        list.append(data(0))                          //填充数据
        list.append(data(1).toInt)                   // 填充数据
        Row.fromSeq(list)                           //创建row
    })

    val fields = new mutable.ArrayBuffer[StructField]()
    fields.append(StructField("name",StringType,true))  //添加 name schema
    fields.append(StructField("age",IntegerType,true))  //添加 age schema
    val schema = StructType(fields)                     //创建schema 结构

    val rdd = sqlContext.applySchema(rowRDD, schema)  //将rdd与schema做匹配

    rdd.registerTempTable("people")                 //注册临时表

    sqlContext.sql("select * from people where age >= 18")     // 筛选成年人士
      .collect()
      .foreach(println)

    sc.stop()
  }
}[/mw_shl_code]


Spark Sql 坑
由于公司是1.2 的spark,所以我遇见的问题不一定在其他版本存在。
  •     spark sql 的字段居然是大写区分的,囧,一旦大小写对应不上,sql异常,你就只能眼花了
  •     spark sql 的错误提示信息太奇葩了,一点也不准确,比如你where 条件的字段名不对,绝对不会提示你哪个字段找不到,直接把select 的n多字段列出来,把你看晕死
  •     本地测试时一定要使用多核心模式,否则多线程问题你是看不出来的,并且单核心真心慢死人
  •     udf function 必须是可序列化到分布式环境的,如果出现无法序列化的问题,多半是你不小心使用了类成员等依赖于某个类的变量或方法,在scala中可放在伴生类中
  •     spark sql StructField 的 DataType 无法转为 DecimalType, 当我们根据 StructField做处理时,DecimalType暂时无处处理
  •     spark sql 的 schemaRDD 实际列少于定义,比如使用union all 时如果列数不一样,在运行时会报java.lang.ArrayIndexOutOfBoundsException错误,该异常是不是很好看懂呢?

来源:gitbooks
作者:fs7744


已有(2)人评论

跳转到指定楼层
alu1105 发表于 2016-9-14 14:54:07
sparksql真心没有impala好用
回复

使用道具 举报

905018111 发表于 2016-9-20 22:17:08
感谢楼主分享
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条