问题导读
1.sparkSQL中核心的组件是什么?
2.SchemaRDD组成都包含什么?
3.sparkSQL是否支持多种数据源?
4.spark SQL支持hive哪些功能,不支持哪些功能?
spark SQL支持像SQL,hiveQL或scala那样的关系型查询语句,sparkSQL中核心的组件是SchemaRDD,SchemaRDD由row对象和描述每行中每列的数据类型的schema组成。SchemaRDD和关系型数据库中的表类似,可以从已存在的RDD,parquet文件,json数据集或运行HiveQL创建而来。
sparkSQL中最重要的一个类是SQLContext或它的继承类,创建SQLContext如下:
- val sc: SparkContext // An existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
- import sqlContext.createSchemaRDD
复制代码
除了基本的SQLContext外,还可以创建HiveContext,它提供了SQLContext所没有的超函数,其他的特征包括使用更复杂的HiveQL解析器进行解析查询语句、访问HiveUDF、从hive表中读取数据。当使用HiveContext,不必安装hive,并且任何SQLContext可用的数据源都可用于HiveContext。
指定的用于解析查询语句的SQL变体可通过spark.sql.dialect选项进行设置,该参数可通过SQLContext的setConf方法或在SQL中使用SET key=value进行设置。对于SQLContext来说,唯一可用的dialect是sql,它使用了SparkSQL提供的一个简单的SQL解析器,对于HiveContext来说,默认的dialect是hiveql,sql也是可用的,但HiveQL解析器会复杂的多。
数据源sparkSQL支持多种数据源,SchemaRDD可被当作普通的RDD操作并能被注册为一个临时表,注册SchemaRDD成为一个表允许你在它的数据上运行SQL查询。本节描述了多种将数据导入SchemaRDD的方法。
RDDsparkSQL支持两种不同的方式来将已存在的RDD转换为SchemaRDD,第一种方式使用反射来推断RDD的schema,这种方式会有很多简洁的代码并且能在你书写spark应用程序时已经知道schema的情况下很好的工作。
使用反射推断schemasparkSQL中的scala接口支持自动将包含case类的RDD转换为SchemaRDD,其中case类定义了表的schema。case类中的属性会被用于表中的列名,case类还可以被嵌套或包含复杂类型如Sequence或Array。 - // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
- import sqlContext.createSchemaRDD
- // Define the schema using a case class.
- // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
- // you can use custom classes that implement the Product interface.
- case class Person(name: String, age: Int)
- // Create an RDD of Person objects and register it as a table.
- val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
- people.registerTempTable("people")
- // SQL statements can be run by using the sql methods provided by sqlContext.
- val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
- // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码
编程方式指定的schema当case类不能被提前定义(如数据被编码成字符串或文本数据集将被解析),这时可以以编程方式来创建SchemaRDD:
1.从原始的RDD创建行的RDD
2.创建匹配第一步创建的RDD中的行结构的StructType
3.使用SQLContext的applySchema方法将schema应用到行RDD中 - // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // Create an RDD
- val people = sc.textFile("examples/src/main/resources/people.txt")
- // The schema is encoded in a string
- val schemaString = "name age"
- // Import Spark SQL data types and Row.
- import org.apache.spark.sql._
- // Generate the schema based on the string of schema
- val schema =
- StructType(
- schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
- // Convert records of the RDD (people) to Rows.
- val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
- // Apply the schema to the RDD.
- val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
- // Register the SchemaRDD as a table.
- peopleSchemaRDD.registerTempTable("people")
- // SQL statements can be run by using the sql methods provided by sqlContext.
- val results = sqlContext.sql("SELECT name FROM people")
- // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
- // The columns of a row in the result can be accessed by ordinal.
- results.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码
parquet文件parquet是柱状格式数据,它可以被其他很多的数据处理系统支持,sparkSQL支持读写parquet文件以自动保护原始数据的schema。 编程方式导入数据例子如下: - // sqlContext from the previous example is used in this example.
- // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
- import sqlContext.createSchemaRDD
- val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
- // The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
- people.saveAsParquetFile("people.parquet")
- // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
- // The result of loading a Parquet file is also a SchemaRDD.
- val parquetFile = sqlContext.parquetFile("people.parquet")
- //Parquet files can also be registered as tables and then used in SQL statements.
- parquetFile.registerTempTable("parquetFile")
- val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
- teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码
配置parquet的配置可通过SQLContext的setConf方法或通过运行SET key=value方式完成。 属性名 | 默认值 | 注释 | spark.sql.parquet.binaryAsString | false | 一些其他的支持parquet的系统如impala或旧版本的spark,在写出parquet schema的时候不会区分二进制数据和字符串。该配置选项告诉sparkSQL将字符串解析为二进制数据以使兼容这些系统 | spark.sql.parquet.cacheMetadata | false | 打开缓存parquet schema元数据会提升静态数据的查询速度 | spark.sql.parquet.compression.codec | snappy | 设置写parquet文件的压缩方式,包括:uncompressed,snappy,gzip,lzo | json数据集
sparkSQL可以通过json数据集推断出schema并将该schema转换为SchemaRDD,该转换可以通过SQLContext的以下两个方法实现: jsonFile-从json文件所在目录中导入数据,文件的每行必须是一个json对象jsonRdd-从已存在的RDD导入数据,RDD中的每个元素是包含json对象的字符串 - // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // A JSON dataset is pointed to by path.
- // The path can be either a single text file or a directory storing text files.
- val path = "examples/src/main/resources/people.json"
- // Create a SchemaRDD from the file(s) pointed to by path
- val people = sqlContext.jsonFile(path)
- // The inferred schema can be visualized using the printSchema() method.
- people.printSchema()
- // root
- // |-- age: IntegerType
- // |-- name: StringType
- // Register this SchemaRDD as a table.
- people.registerTempTable("people")
- // SQL statements can be run by using the sql methods provided by sqlContext.
- val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
- // Alternatively, a SchemaRDD can be created for a JSON dataset represented by
- // an RDD[String] storing one JSON object per string.
- val anotherPeopleRDD = sc.parallelize(
- """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
- val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
复制代码
hive表sparkSQL同时也支持读写存储在hive中数据,然而,由于hive有很多的依赖,所以在使用hive前需要运行sbt/sbt -Phive assembly/assembly来编译一个包含hive的jar包。这个jar包也需要放到所有的worker节点上。
配置hive的一些参数需要放在conf目录的hive-site.xml文件中。
当需要使用hive则必须构造一个HiveContext,它继承于SQLContext,当用户没有部署过hive也可以创建HiveContext,当没有配置hive-site.xml文件,那么context会自动在当前目录中创建metastore_db和warehouse。 - // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
- sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
- // Queries are expressed in HiveQL
- sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
复制代码
性能优化对于很多情况来说需要使用将数据缓存在内存中或使用一些试验性选项来优化的方式来提高性能。 将数据缓存在内存sparkSQL可以通过cacheTable("tableName")方法将表缓存在内存中,这样sparkSQL将只会扫描需要的列并将自动优化压缩以最小化内存使用和GC压力。使用uncacheTable("tableName")将表从内存移除。如果使用cache,那么表将不会使用柱状格式缓存在内存中,所以推荐使用cacheTable方法。
相关的一些参数如下: 属性名 | 默认值 | 注释 | spark.sql.inMemoryColumnarStorage.compressed | false | 当为true时,sparkSQL会基于数据的统计情况自动选择一种压缩方式来压缩每列数据 | spark.sql.inMemoryColumnarStorage.batchSize | 1000 | 控制缓存时的大小,较大的值可以提高内存使用率和压缩效率,但会出现OOM |
其他的配置选项下面的一些参数也可以用来优化查询性能: 属性名 | 默认值 | 注释 | spark.sql.autoBroadcastJoinThreshold | 10000 | 当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效 | spark.sql.codegen | false | 当为true时,那么会在运行时动态生成指定查询的表达式,对于那些拥有复杂表达式的查询来说,该选项会导致明显的速度提升,然而对于简单的查询该选项会减慢查询速度 | spark.sql.shuffle.partitions | 200 | 配置在处理join或aggregation时shuffle数据的partition数量 |
其他sql接口运行thrift jdbc server运行如下命令即可启动jdbc server: - ./sbin/start-thriftserver.sh
复制代码
该命令接受所有bin/spark-submit的命令行参数,另外还可通过--hiveconf参数指定hive的属性文件,默认该服务会监听localhost:10000,可通过以下两种方式进行修改: - export HIVE_SERVER2_THRIFT_PORT=<listening-port>
- export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
- ./sbin/start-thriftserver.sh --master <master-uri> ...
复制代码
- ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=<listening-port> --hiveconf hive.server2.thrift.bind.host=<listening-host> --master <master-uri> ...
复制代码
现在就可以使用下面命令进行测试了: 复制代码
然后在beeline命令行模式下运行!connect jdbc:hive2://localhost:10000,beeline将会提示你输入用户名和密码,具体可查看 beeline文档。
运行sparkSQL CLIsparkSQL CLI可以在本地模式运行hive metastore并执行通过命令行输入的查询语句,sparkSQL CLI不能与thrift jdbc服务进行通信。下面代码可以启动sparkSQL CLI: 复制代码
与其他系统兼容shark用户迁移指南调度通过SET spark.sql.thriftserver.scheduler.pool=accounting;来设置jdbc客户端会话的公平调度池。
reducer数量在shark中,默认的reducer数量为1并且通过mapred.reduce.tasks属性进行设置,sparkSQL已经废弃了该属性,以spark.sql.shuffle.partitions(默认值为200)代替,通过以下代码进行设置: - SET spark.sql.shuffle.partitions=10;
- SELECT page, count(*) c
- FROM logs_last_month_cached
- GROUP BY page ORDER BY c DESC LIMIT 10;
复制代码
还可以将该属性放在hive-site.xml文件中覆盖默认值。到目前,mapred.reduce.tasks属性还可以使用,会被自动转换为spark.sql.shuffle.partitions。
缓存spark.cache表属性也不再可用了,任何以_cached结尾的表名也不会被缓存了,现在通过CACHE TABLE和UNCACHE TABLE两个语句来控制表缓存: - CACHE TABLE logs_last_month;
- UNCACHE TABLE logs_last_month;
复制代码
注意:CACHE TABLE tbl是懒加载的,类似RDD中的.cache方法,该命令只是将tbl标记为当被计算的时候确保partitions被缓存,只有等tbl真正被执行的时候才会缓存。如果希望立即被缓存,使用如下方式: - CACHE TABLE logs_last_month;
- SELECT COUNT(1) FROM logs_last_month;
复制代码
与hive兼容sparkSQL可以与hive metastore,serdes和UDF兼容。sparkSQL thrift jdbc服务能够与当前已经安装的hive进行兼容,不需要修改已存在的hive metastore或者改变数据存放目录。
支持的hive特性
- sparkSQL支持很多hive特性,如:
- hive 查询语句:select,group by,order by,cluster by,sort by
- 所有hive操作:关系型操作(=, ⇔, ==, <>, <, >, >=, <=等),算术操作(+, -, *, /, %等),裸机操作(AND, &&, OR, ||等),复杂类型构造,数学函数(sign, ln, cos等),字符串函数(instr, length, printf等)
- 用户定义函数(UDF)
- 用户定义聚合函数(UDAF)
- 用户定义序列化格式(serdes)
- join:join,{left|right|full} outer join,left semi join,cross join
- 联合查询
- 子查询:select col from(select a+b as col from t1)t2
- 取样操作
- 解释操作
- 表分割
- 所有的hive DDL函数:create table,create table as select,alter table
- 大部分的hive数据类型:TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTAMP,ARRAY<>,MAP<>,STRUCT<>
不支持的hive功能
主要的hive特性
sparkSQL目前不支持使用动态分片向表插入数据sparkSQL不支持带有桶的表,桶是hive表分片的hash分片方式
深奥的hive特性
hive中带有分片的表使用不同的输入格式,在sparkSQL中,所有的表分片使用相同的输入格式hive支持在join操作使用non-equi的条件(如key<10),在sparkSQL中如果使用这种方式会输出错误的结果UNION和DATE类型唯一join单查询多插入sparkSQL不支持piggyback浏览来收集列统计,只支持操作hive metastore中的sizeInBytes字段
hive输入输出格式
sparkSQL只支持TextOutputFormathadoop归档文件
hive优化很多的hive优化目前都不能用户sparkSQL,其中一些(如索引)对于sparkSQL不是很重要的,因为sparkSQL是内存计算模型,其他的一些会在未来的sparkSQL版本中得到支持: 块级别的bitmap索引和虚拟字段(用来建立索引)自动将join转换为map join:在大表与很多小表进行join时,hive会自动将join转换为map join,下个版本的sparkSQL会得到支持自动决定join和groupby时reducer的数量,目前,sparkSQL需要使用SET spark.sql.shuffle.partitions=[num_tasks];来控制并行度只查询元数据:hive在查询时可以只查询元数据,sparkSQL需要部署任务来计算结果sparkSQL不支持hive中的skew data flagsparkSQL不支持hive的STREAMTABLE合并多个小文件作为查询结果:如果查询结果包括很多的小文件,hive可以合并这些小文件为大文件避免HDFS元数据容量溢出。sparkSQL暂时不支持该特性
书写语言集成的关系型查询语言集成查询是高级特性并只在scala中支持。如: - // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
- import sqlContext._
- val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
- // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
- val teenagers = people.where('age >= 10).where('age <= 19).select('name)
- teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码
使用'符号会隐式将相关语句转换为sql表达式,相关详细信息可查看 scalaDoc。
sparkSQL数据类型
数字类型:Byte,Short,Integer,Long,Float,Double,Decimal字符串类型:String二进制类型:Binary布尔类型:Boolean日期类型:Timestamp复杂类型:Array(elementType,containsNull),表示一系列的elementType类型的数组,containsNull表示数组中的值是否允许为null;Map(keyType,valueType,valueContainsNull):表示一系列的key-value值对;Struct(fields):表示由一系列的StructFields组成的结构体,StructFields的定义为StructField(name,dataType,nullable) 所有的sparkSQL数据类型都在org.apache.spark.sql包中,通过import org.apache.spark.sql._来导入所有的数据类型。
数据类型 | scala中值类型 | 访问api或创建数据类型 | ByteType | Byte | ByteType | ShortType | Short | ShortType | IntegerType | Int | IntegerType | LongType | Long | LongType | FloatType | Float | FloatType | Double | Double | DoubleType | DecimalType | scala.math.sql.BigDecimal | DecimalType | StringType | String | StringType | BinaryType | Array[Byte] | BinaryType | BooleanType | Boolean | BooleanType | TimestampType | java.sql.Timestamp | TimestampType | ArrayType | scala.collection.Seq | ArrayType(elementType,[containsNull]),containsNull默认值为false | MapType | scala.collection.Map | MapType(keyType,valueType,[valueContainsNull]),valueContainsNull默认为true | StructType | org.apache.spark.sql.Row | StructType(fields),fields是StructFields的Seq,两个名字相同的fields是不允许的 | StructField | 字段数据类型在scala的值类型 | StructField(name,dataType,nullable) |
|