分享

Spark SQL编程指南

问题导读
1、什么是JavaSQLContext类?
2、Spark SQL如何推断一个JSON数据集的schema?
3、如何理解Spark SQL支持表的类型JavaBeans的RDD?





简介
Spark SQL支持在Spark中执行SQL,或者HiveQL的关系查询表达式。它的核心组件是一个新增的RDD类型JavaSchemaRDD。JavaSchemaRDD由Row对象和表述这个行的每一列的数据类型的schema组成。一个JavaSchemaRDD类似于传统关系数据库的一个表。JavaSchemaRDD可以通过一个已存在的RDD,Parquet文件,JSON数据集,或者通过运行HiveSQL获得存储在Apache Hive上的数据创建。

Spark SQL目前是一个alpha组件。尽管我们会尽量减少API变化,但是一些API任然后再以后的发布中改变。

入门
在Spark中,所有关系函数功能的入口点是JavaSQLContext类。或者他的子类。要创建一个基本的JavaSQLContext,所有你需要的只是一个JavaSparkContext。
  1. JavaSparkContext sc = ...; // An existing JavaSparkContext.
  2. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
复制代码


数据源
Spark SQL支持通过JavaSchemaRDD接口操作各种各样的数据源。一单一个数据集被加载,它可以被注册成一个表,甚至和来自其他源的数据连接。

RDDs
Spark SQL支持的表的其中一个类型是由JavaBeans的RDD。BeanInfo定义了这个表的schema。现在 ,Spark SQL 不支持包括嵌套或者复杂类型例如Lists或者Arrays的JavaBeans。你可以通过创建一个实现了Serializable并且它的所有字段都有getters和setters方法的类类创建一个JavaBeans。
  1. public static class Person implements Serializable {
  2.   private String name;
  3.   private int age;
  4.   public String getName() {
  5.     return name;
  6.   }
  7.   public void setName(String name) {
  8.     this.name = name;
  9.   }
  10.   public int getAge() {
  11.     return age;
  12.   }
  13.   public void setAge(int age) {
  14.     this.age = age;
  15.   }
  16. }
复制代码


一个schema可以被应用在一个已存在的RDD上,通过调用applySchema并且提供这个JavaBean的类对象。

  1. // sc is an existing JavaSparkContext.
  2. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
  3. // Load a text file and convert each line to a JavaBean.
  4. JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  5.   new Function<String, Person>() {
  6.     public Person call(String line) throws Exception {
  7.       String[] parts = line.split(",");
  8.       Person person = new Person();
  9.       person.setName(parts[0]);
  10.       person.setAge(Integer.parseInt(parts[1].trim()));
  11.       return person;
  12.     }
  13.   });
  14. // Apply a schema to an RDD of JavaBeans and register it as a table.
  15. JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
  16. schemaPeople.registerAsTable("people");
  17. // SQL can be run over RDDs that have been registered as tables.
  18. JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
  19. // The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
  20. // The columns of a row in the result can be accessed by ordinal.
  21. List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  22.   public String call(Row row) {
  23.     return "Name: " + row.getString(0);
  24.   }
  25. }).collect();
复制代码

注意,Spark SQL目前使用一个非常简单的SQL解析器。用户如果想获得一个更加完整的SQL方言,应该看看HiveContext提供的HiveQL支持。


Parquet Files
Parquet是一个columnar格式,并且被许多其他数据处理系统支持。Spark SQL对读写Parquet文件提供支持,并且自动保存原始数据的Schema。通过下面的例子使用数据:

  1. // sqlContext from the previous example is used in this example.
  2. JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
  3. // JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
  4. schemaPeople.saveAsParquetFile("people.parquet");
  5. // Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
  6. // The result of loading a parquet file is also a JavaSchemaRDD.
  7. JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");
  8. //Parquet files can also be registered as tables and then used in SQL statements.
  9. parquetFile.registerAsTable("parquetFile");
  10. JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
  11. List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  12.   public String call(Row row) {
  13.     return "Name: " + row.getString(0);
  14.   }
  15. }).collect();
复制代码



JSON Datasets
Spark SQL可以自动推断一个JSON数据集的schema,并加载成一个JavaSchemaRDD。这个转换可以通过JavaSQLContext中的两个方法中的一个完成:

jsonFile -从一个目录下的文件中加载数据,这个文件中的每一行都是一个JSON对象。

jsonRdd -从一个已存在的RDD加载数据,这个RDD中的每一个元素是一个包含一个JSON对象的String。

  1. // sc is an existing JavaSparkContext.
  2. JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
  3. // A JSON dataset is pointed to by path.
  4. // The path can be either a single text file or a directory storing text files.
  5. String path = "examples/src/main/resources/people.json";
  6. // Create a JavaSchemaRDD from the file(s) pointed to by path
  7. JavaSchemaRDD people = sqlContext.jsonFile(path);
  8. // The inferred schema can be visualized using the printSchema() method.
  9. people.printSchema();
  10. // root
  11. //  |-- age: IntegerType
  12. //  |-- name: StringType
  13. // Register this JavaSchemaRDD as a table.
  14. people.registerAsTable("people");
  15. // SQL statements can be run by using the sql methods provided by sqlContext.
  16. JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
  17. // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
  18. // an RDD[String] storing one JSON object per string.
  19. List<String> jsonData = Arrays.asList(
  20.   "{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}");
  21. JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
  22. JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
复制代码


Hive Tables
Spark SQL也支持读和写存储在apache Hive中的数据。然而,由于Hive有一个非常大的依赖,他没有在Spark默认宝中包括。为了使用Hive,你必须运行‘SPARK_HIVE=true sbt/sbt assembly/assembly'(或者对Maven使用 -Phive)。这个命令构建一个包含Hive的assembly。注意,这个Hive assembly 必须放在所有的工作节点上,因为它们需要访问Hive的序列化和方序列化包(SerDes),以此访问存储在Hive中的数据。

可以通过conf目录下的hive-site.xml文件完成Hive配置 。

要和Hive配合工作,你需要构造一个JavaHiveContext,它继承了JavaSQLContext,并且添加了发现MetaStore中的表和使用HiveQL编写查询的功能。此外,除了sql方法,JavaHiveContext方法还提供了一个hql方法,它允许查询使用HiveQL表达。


Writing Language-Integrated Relational Queries
Language-Integrated查询目前只在Scala中被支持。

Spark SQL同样支持使用领域特定的语言来编写查询。再次,使用上面例子中的数据:
  1. // sc is an existing SparkContext.
  2. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  3. // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
  4. import sqlContext._
  5. val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
  6. // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
  7. val teenagers = people.where('age >= 10).where('age <= 19).select('name)
  8. teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
复制代码


DSL使用Scala中得到标记来表示基础表中的表,他们使用一个前缀’标识。隐式转换这些标记为被SQL 执行引擎评估的表达式。支持这些功能的完成列表可以再ScalaDoc找到。

已有(2)人评论

跳转到指定楼层
anyhuayong 发表于 2014-9-13 07:37:48
好文章必须回复
回复

使用道具 举报

xin07020220 发表于 2014-9-13 22:23:00
真是来得太及时了,正想了解一下Spark SQL的相关知识!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条