分享

Apache Spark源码走读之13 -- hiveql on spark实现详解

本帖最后由 pig2 于 2015-1-6 14:14 编辑

问题导读

1.Hive中有几种数据模型?
2.HiveQL的执行过程是怎样的?











概要
在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情。

Hive简介

Hive的由来
以下部分摘自Hadoop definite guide中的Hive一章
“Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询。
Hive大大简化了对大规模数据集的分析门槛(不再要求分析人员具有很强的编程能力),迅速流行起来,成为Hadoop生成圈上的Killer Application. 目前已经有很多组织把Hive作为一个通用的,可伸缩数据处理平台。”

数据模型(Data Model)
Hive所有的数据都存在HDFS中,在Hive中有以下几种数据模型
  • Tables(表) table和关系型数据库中的表是相对应的,每个表都有一个对应的hdfs目录,表中的数据经序列化后存储在该目录,Hive同时支持表中的数据存储在其它类型的文件系统中,如NFS或本地文件系统
  • 分区(Partitions) Hive中的分区起到的作用有点类似于RDBMS中的索引功能,每个Partition都有一个对应的目录,这样在查询的时候,可以减少数据规模
  • 桶(buckets) 即使将数据按分区之后,每个分区的规模有可能还是很大,这个时候,按照关键字的hash结果将数据分成多个buckets,每个bucket对应于一个文件

Query Language
HiveQL是Hive支持的类似于SQL的查询语言。HiveQL大体可以分成下面两种类型
  • DDL(data definition language)  比如创建数据库(create database),创建表(create table),数据库和表的删除
  • DML(data manipulation language) 数据的添加,查询
  • UDF(user defined function) Hive还支持用户自定义查询函数

Hive architecture
hive的整体框架图如下图所示
032126369276759.jpg

由上图可以看出,Hive的整体架构可以分成以下几大部分
  • 用户接口  支持CLI, JDBC和Web UI
  • Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
  • MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎

HiveQL执行过程
HiveQL的执行过程如下所述
  • parser 将HiveQL解析为相应的语法树
  • Semantic Analyser 语义分析
  • Logical Plan Generating 生成相应的LogicalPlan
  • Query Plan Generating
  • Optimizer
最终生成MapReduce的Job,交付给Hadoop的MapReduce计算框架具体运行。

Hive实例
最好的学习就是实战,Hive这一小节还是以一个具体的例子来结束吧。
前提条件是已经安装好hadoop,具体安装可以参考源码走读11或走读9

step 1: 创建warehouse
warehouse用来存储raw data
  1. $ $HADOOP_HOME/bin/hadoop fs -mkdir       /tmp
  2. $ $HADOOP_HOME/bin/hadoop fs -mkdir       /user/hive/warehouse
  3. $ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /tmp
  4. $ $HADOOP_HOME/bin/hadoop fs -chmod g+w   /user/hive/warehouse
复制代码

step 2: 启动hive cli
  1. $ export HIVE_HOME=<hive-install-dir>
  2. $ $HIVE_HOME/bin/hive
复制代码

step 3: 创建表
创建表,首先将schema数据写入到metastore,另一件事情就是在warehouse目录下创建相应的子目录,该子目录以表的名称命名
  1. CREATE TABLE u_data (
  2.   userid INT,
  3.   movieid INT,
  4.   rating INT,
  5.   unixtime STRING)
  6. ROW FORMAT DELIMITED
  7. FIELDS TERMINATED BY '\t'
  8. STORED AS TEXTFILE;
复制代码

step 4: 导入数据
导入的数据会存储在step 3中创建的表目录下
  1. LOAD DATA LOCAL INPATH '/u.data'
  2. OVERWRITE INTO TABLE u_data;
复制代码

step 5: 查询
  1. SELECT COUNT(*) FROM u_data;
复制代码

hiveql on Spark
Q: 上一章节花了大量的篇幅介绍了hive由来,框架及hiveql执行过程。那这些东西跟我们标题中所称的hive on spark有什么关系呢?
Ans:  Hive的整体解决方案很不错,但有一些地方还值得改进,其中之一就是“从查询提交到结果返回需要相当长的时间,查询耗时太长”。之所以查询时间很长,一个主要的原因就是因为Hive原生是基于MapReduce的,哪有没有办法提高呢。您一定想到了,“不是生成MapReduce Job,而是生成Spark Job”, 充分利用Spark的快速执行能力来缩短HiveQl的响应时间。
下图是Spark 1.0中所支持的lib库,SQL是其唯一新添加的lib库,可见SQL在Spark 1.0中的地位之重要。

       041055085367623.png

HiveContext
HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
让我们回顾一下,SqlContext中牵涉到的类及其间的关系如下图所示,具体分析过程参见本系列中的源码走读之11

      271444453061825.png
既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。

      032221051925786.png
有了上述的比较,就能抓住源码分析时需要把握的几个关键点
  • Entrypoint           HiveContext.scala
  • QueryExecution    HiveContext.scala
    • parser       HiveQl.scala
    • optimizer


数据
使用到的数据有两种
  • Schema Data  像数据库的定义和表的结构,这些都存储在MetaStore中
  • Raw data        即要分析的文件本身
Entrypoint
hiveql是整个的入口点,而hql是hiveql的缩写形式。

  1. def hiveql(hqlQuery: String): SchemaRDD = {
  2.     val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
  3.     // We force query optimization to happen right away instead of letting it happen lazily like
  4.     // when using the query DSL.  This is so DDL commands behave as expected.  This is only
  5.     // generates the RDD lineage for DML queries, but does not perform any execution.
  6.     result.queryExecution.toRdd
  7.     result
  8.   }
复制代码

上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参
HiveQL, parser
parseSql的函数定义如代码所示,解析过程中将指令分成两大类
  • nativecommand     非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成
  • 非nativecommand  主要是select语句

  1. def parseSql(sql: String): LogicalPlan = {
  2.     try {
  3.       if (sql.toLowerCase.startsWith("set")) {
  4.         NativeCommand(sql)
  5.       } else if (sql.toLowerCase.startsWith("add jar")) {
  6.         AddJar(sql.drop(8))
  7.       } else if (sql.toLowerCase.startsWith("add file")) {
  8.         AddFile(sql.drop(9))
  9.       } else if (sql.startsWith("dfs")) {
  10.         DfsCommand(sql)
  11.       } else if (sql.startsWith("source")) {
  12.         SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
  13.       } else if (sql.startsWith("!")) {
  14.         ShellCommand(sql.drop(1))
  15.       } else {
  16.         val tree = getAst(sql)
  17.         if (nativeCommands contains tree.getText) {
  18.           NativeCommand(sql)
  19.         } else {
  20.           nodeToPlan(tree) match {
  21.             case NativePlaceholder => NativeCommand(sql)
  22.             case other => other
  23.           }
  24.         }
  25.       }
  26.     } catch {
  27.       case e: Exception => throw new ParseException(sql, e)
  28.       case e: NotImplementedError => sys.error(
  29.         s"""
  30.           |Unsupported language features in query: $sql
  31.           |${dumpTree(getAst(sql))}
  32.         """.stripMargin)
  33.     }
  34.   }        
复制代码

哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量,列表很长,代码就不一一列出。
对于非nativeCommand,最重要的解析函数就是nodeToPlan

toRdd
Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。
在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素
  1. override lazy val toRdd: RDD[Row] =
  2.       analyzed match {
  3.         case NativeCommand(cmd) =>
  4.           val output = runSqlHive(cmd)
  5.           if (output.size == 0) {
  6.             emptyResult
  7.           } else {
  8.             val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
  9.             sparkContext.parallelize(asRows, 1)
  10.           }
  11.         case _ =>
  12.           executedPlan.execute().map(_.copy())
  13.       }
复制代码


native command的执行流程
由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下

041522094748036.png


analyzer
HiveTypeCoercion
  1. val typeCoercionRules =
  2.     List(PropagateTypes, ConvertNaNs, WidenTypes, PromoteStrings, BooleanComparisons, BooleanCasts,
  3.       StringToIntegralCasts, FunctionArgumentConversion)               
复制代码


optimizer
PreInsertionCasts存在的目的就是确保在数据插入执行之前,相应的表已经存在。
  1. override lazy val optimizedPlan =
  2.       optimizer(catalog.PreInsertionCasts(catalog.CreateTables(analyzed)))
复制代码

此处要注意的是catalog的用途,catalog是HiveMetastoreCatalog的实例。

HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。

040000418955269.png

HiveMetastoreCatalog
HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api。其中包括了广为人知的deSer library。
以CreateTable函数为例说明对Hive Library的依赖。
  1. def createTable(
  2.       databaseName: String,
  3.       tableName: String,
  4.       schema: Seq[Attribute],
  5.       allowExisting: Boolean = false): Unit = {
  6.     val table = new Table(databaseName, tableName)
  7.     val hiveSchema =
  8.       schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
  9.     table.setFields(hiveSchema)
  10.     val sd = new StorageDescriptor()
  11.     table.getTTable.setSd(sd)
  12.     sd.setCols(hiveSchema)
  13.     // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
  14.     sd.setCompressed(false)
  15.     sd.setParameters(Map[String, String]())
  16.     sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
  17.     sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
  18.     val serDeInfo = new SerDeInfo()
  19.     serDeInfo.setName(tableName)
  20.     serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
  21.     serDeInfo.setParameters(Map[String, String]())
  22.     sd.setSerdeInfo(serDeInfo)
  23.     try client.createTable(table) catch {
  24.       case e: org.apache.hadoop.hive.ql.metadata.HiveException
  25.         if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
  26.            allowExisting => // Do nothing.
  27.     }
  28.   }
复制代码

实验
结合源码,我们再对一个简单的例子作下说明。
可能你会想,既然spark也支持hql,那么我原先用hive cli创建的数据库和表用spark能不能访问到呢?答案或许会让你很纳闷,“在默认的配置下是不行的”。为什么?
Hive中的meta data采用的存储引擎是Derby,该存储引擎只能有一个访问用户。同一时刻只能有一个人访问,即便以同一用户登录访问也不行。针对这个局限,解决方法就是将metastore存储在mysql或者其它可以多用户访问的数据库中。

具体实例
  • 创建表
  • 导入数据
  • 查询
  • 删除表
在启动spark-shell之前,需要先设置环境变量HIVE_HOME和HADOOP_HOME.

启动spark-shell之后,执行如下代码
  1. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  2. // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
  3. import hiveContext._
  4. hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
  5. hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
  6. // Queries are expressed in HiveQL
  7. hql("FROM src SELECT key, value").collect().foreach(println)
  8. hql("drop table src")
复制代码

create操作会在/user/hive/warehouse/目录下创建src目录,可以用以下指令来验证
  1. $HADOOP_HOME/bin/hdfs dfs -ls /user/hive/warehouse/
复制代码

drop表的时候,不仅metastore中相应的记录被删除,而且原始数据raw file本身也会被删除,即在warehouse目录下对应某个表的目录会被整体删除掉。
上述的create, load及query操作对metastore和raw data的影响可以用下图的表示

041036076142815.png
hive-site.xml
如果想对hive默认的配置作修改,可以使用hive-site.xml。
具体步骤如下
-  在$SPARK_HOME/conf目录下创建hive-site.xml
-  根据需要,添写相应的配置项的值,可以这样做,将$HIVE_HOME/conf目录下的hive-default.xml复制到$SPARK_HOME/conf,然后重命名为hive-site.xml

Sql新功能预告
为了进一步提升sql的执行速度,在Spark开发团队在发布完1.0之后,会通过codegen的方法来提升执行速度。codegen有点类似于jvm中的jit技术。充分利用了scala语言的特性。

前景分析
Spark目前还缺乏一个非常有影响力的应用,也就通常所说的killer application。SQL是Spark在寻找killer application方面所做的一个积极尝试,也是目前Spark上最有热度的一个话题,但通过优化Hive执行速度来吸引潜在Spark用户,该突破方向选择正确与否还有待市场证明。
Hive除了在执行速度上为人诟病之外,还有一个最大的问题就是多用户访问的问题,相较第一个问题,第二个问题来得更为致命。无论是Facebook在Hive之后推出的Presto还是Cloudera推出的Impala都是针对第二问题提出的解决方案,目前都已经取得的了巨大优势。

小结
本文就Spark对HiveQL提供支持的这一功能进行了比较详细的分析,其中涉及到以下几个问题。
  • 什么是hive
  • hive有什么缺点,否则就没Spark或Shark啥事了
  • Spark主要是针对hive的哪个不足做出改进
  • Spark是如何对这个做改进的



相关内容


Apache Spark源码走读之1 -- Spark论文阅读笔记

Apache Spark源码走读之2 -- Job的提交与运行

Apache Spark源码走读之3-- Task运行期之函数调用关系分析

Apache Spark源码走读之4 -- DStream实时流数据处理

Apache Spark源码走读之5-- DStream处理的容错性分析

Apache Spark源码走读之6-- 存储子系统分析

Apache Spark源码走读之7 -- Standalone部署方式分析

Apache Spark源码走读之8 -- Spark on Yarn

Apache Spark源码走读之9 -- Spark源码编译

Apache Spark源码走读之10 -- 在YARN上运行SparkPi

Apache Spark源码走读之11 -- sql的解析与执行

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建


Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

Apache Spark源码走读之16 -- spark repl实现详解

Apache Spark源码走读之17 -- 如何进行代码跟读

Apache Spark源码走读之18 -- 使用Intellij idea调试Spark源码

Apache Spark源码走读之19 -- standalone cluster模式下资源的申请与释放

Apache Spark源码走读之20 -- ShuffleMapTask计算结果的保存与读取

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

Apache Spark源码走读之23 -- Spark MLLib中拟牛顿法L-BFGS的源码实现

Apache Spark源码走读之24 -- Sort-based Shuffle的设计与实现




参考资料





欢迎加入about云群90371779322273151432264021 ,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条