分享

Spark SQL之External DataSource外部数据源(二)源码分析

本帖最后由 nettman 于 2015-4-20 23:59 编辑
问题导读:
1、注册外部数据源的表的流程是什么?
2、Spark SQL解析SQL流程是什么?
3、我们所支持4种BaseRelation分别是?




接上篇:Spark SQL之External DataSource外部数据源(一)示例

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的。  

一、Sources包核心   
Spark SQL在Spark1.2中提供了External DataSource API,开发者可以根据接口来实现自己的外部数据源,如avro, csv, json, parquet等等。
    在Spark SQL源代码的org/spark/sql/sources目录下,我们会看到关于External DataSource的相关代码。这里特别介绍几个:
    1、DDLParser
    专门负责解析外部数据源SQL的SqlParser,解析create temporary table xxx using options (key 'value', key 'value') 创建加载外部数据源表的语句。
  1.     protected lazy val createTable: Parser[LogicalPlan] =  
  2.        CREATE ~ TEMPORARY ~ TABLE ~> ident ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {  
  3.          case tableName ~ provider ~ opts =>  
  4.            CreateTableUsing(tableName, provider, opts)  
  5.        }  
复制代码
  
2、CreateTableUsing
   一个RunnableCommand,通过反射从外部数据源lib中实例化Relation,然后注册到为temp table。
  1.     private[sql] case class CreateTableUsing(  
  2.         tableName: String,  
  3.         provider: String,  // org.apache.spark.sql.json   
  4.         options: Map[String, String]) extends RunnableCommand {  
  5.       
  6.       def run(sqlContext: SQLContext) = {  
  7.         val loader = Utils.getContextOrSparkClassLoader  
  8.         val clazz: Class[_] = try loader.loadClass(provider) catch { //do reflection  
  9.           case cnf: java.lang.ClassNotFoundException =>  
  10.             try loader.loadClass(provider + ".DefaultSource") catch {  
  11.               case cnf: java.lang.ClassNotFoundException =>  
  12.                 sys.error(s"Failed to load class for data source: $provider")  
  13.             }  
  14.         }  
  15.         val dataSource = clazz.newInstance().asInstanceOf[org.apache.spark.sql.sources.RelationProvider] //json包DefaultDataSource  
  16.         val relation = dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))//创建JsonRelation  
  17.       
  18.         sqlContext.baseRelationToSchemaRDD(relation).registerTempTable(tableName)//注册  
  19.         Seq.empty  
  20.       }  
  21.     }  
复制代码

  3、DataSourcesStrategy
    在 Strategy 一文中,我已讲过Streategy的作用,用来Plan生成物理计划的。这里提供了一种专门为了解析外部数据源的策略。
    最后会根据不同的BaseRelation生产不同的PhysicalRDD。不同的BaseRelation的scan策略下文会介绍。
  1.     private[sql] object DataSourceStrategy extends Strategy {  
  2.       def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {  
  3.         case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) =>  
  4.           pruneFilterProjectRaw(  
  5.             l,  
  6.             projectList,  
  7.             filters,  
  8.             (a, f) => t.buildScan(a, f)) :: Nil  
  9.         ......  
  10.         case l @ LogicalRelation(t: TableScan) =>  
  11.           execution.PhysicalRDD(l.output, t.buildScan()) :: Nil  
  12.       
  13.         case _ => Nil  
  14.       }  
复制代码

   4、interfaces.scala     该文件定义了一系列可扩展的外部数据源接口,对于想要接入的外部数据源,我们只需实现该接口即可。里面比较重要的trait RelationProvider 和 BaseRelation,下文会详细介绍。
   5、filters.scala
    该Filter定义了如何在加载外部数据源的时候,就进行过滤。注意哦,是加载外部数据源到Table里的时候,而不是Spark里进行filter。这个有点像hbase的coprocessor,查询过滤在Server上就做了,不在Client端做过滤。
  6、LogicalRelation
   封装了baseRelation,继承了catalyst的LeafNode,实现MultiInstanceRelation。
    20141221212410668.png

二、External DataSource注册流程
用spark sql下sql/json来做示例, 画了一张流程图,如下:
20141222083247328.png

注册外部数据源的表的流程:
1、提供一个外部数据源文件,比如json文件。
2、提供一个实现了外部数据源所需要的interfaces的类库,比如sql下得json包,在1.2版本后改为了External Datasource实现。
3、引入SQLContext,使用DDL创建表,如create temporary table xxx using options (key 'value', key 'value')
4、External Datasource的DDLParser将对该SQL进行Parse
5、Parse后封装成为一个CreateTableUsing类的对象。该类是一个RunnableCommand,其run方法会直接执行创建表语句。
6、该类会通过反射来创建一个org.apache.spark.sql.sources.RelationProvider,该trait定义要createRelation,如json,则创建JSONRelation,若avro,则创建AvroRelation。
7、得到external releation后,直接调用SQLContext的baseRelationToSchemaRDD转换为SchemaRDD
8、最后registerTempTable(tableName) 来注册为Table,可以用SQL来查询了。

三、External DataSource解析流程
先看图,图如下:
20141221233625934.png

Spark SQL解析SQL流程如下:
1、Analyzer通过Rule解析,将UnresolvedRelation解析为JsonRelation。
2、通过Parse,Analyzer,Optimizer最后得到JSONRelation(file:///path/to/shengli.json,1.0)  
3、通过sources下得DataSourceStrategy将LogicalPlan映射到物理计划PhysicalRDD。
4、PhysicalRDD里包含了如何查询外部数据的规则,可以调用execute()方法来执行Spark查询。

四、External Datasource Interfaces
在第一节我已经介绍过,主要的interfaces,主要看一下BaseRelation和RelationProvider。
如果我们要实现一个外部数据源,比如avro数据源,支持spark sql操作avro file。那么久必须定义AvroRelation来继承BaseRelation。同时也要实现一个RelationProvider。
20141221235230593.png

BaseRelation:
是外部数据源的抽象,里面存放了schema的映射,和如何scan数据的规则。
  1.     abstract class BaseRelation {  
  2.       def sqlContext: SQLContext  
  3.       def schema: StructType  
复制代码
  1.     abstract class PrunedFilteredScan extends BaseRelation {  
  2.       def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]  
  3.     }  
复制代码
1、schema我们如果自定义Relation,必须重写schema,就是我们必须描述对于外部数据源的Schema。
2、buildScan我们定义如何查询外部数据源,提供了4种Scan的策略,对应4种BaseRelation。

我们支持4种BaseRelation,分为TableScan, PrunedScan,PrunedFilterScan,CatalystScan。
   1、TableScan:
          默认的Scan策略。
   2、PrunedScan:
          这里可以传入指定的列,requiredColumns,列裁剪,不需要的列不会从外部数据源加载。
   3、PrunedFilterScan:
          在列裁剪的基础上,并且加入Filter机制,在加载数据也的时候就进行过滤,而不是在客户端请求返回时做Filter。
   4、CatalystScan:
           Catalyst的支持传入expressions来进行Scan。支持列裁剪和Filter。

RelationProvider:
我们要实现这个,接受Parse后传入的参数,来生成对应的External Relation,就是一个反射生产外部数据源Relation的接口。
  1.     trait RelationProvider {  
  2.       /**
  3.        * Returns a new base relation with the given parameters.
  4.        * Note: the parameters' keywords are case insensitive and this insensitivity is enforced
  5.        * by the Map that is passed to the function.
  6.        */  
  7.       def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation  
  8.     }  
复制代码

五、External Datasource定义示例
在Spark1.2之后,json和parquet也改为通过实现External API来进行外部数据源查询的。
下面以json的外部数据源定义为示例,说明是如何实现的:
20141222004022027.png

1、JsonRelation
定义处理对于json文件的,schema和Scan策略,均基于JsonRDD,细节可以自行阅读JsonRDD。
  1.     private[sql] case class JSONRelation(fileName: String, samplingRatio: Double)(  
  2.         @transient val sqlContext: SQLContext)  
  3.       extends TableScan {  
  4.       
  5.       private def baseRDD = sqlContext.sparkContext.textFile(fileName) //读取json file  
  6.       
  7.       override val schema =  
  8.         JsonRDD.inferSchema(  // jsonRDD的inferSchema方法,能自动识别json的schema,和类型type。  
  9.           baseRDD,  
  10.           samplingRatio,  
  11.           sqlContext.columnNameOfCorruptRecord)  
  12.       
  13.       override def buildScan() =  
  14.         JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord) //这里还是JsonRDD,调用jsonStringToRow查询返回Row  
  15.     }  
复制代码

2、DefaultSource
parameters中可以获取到options中传入的path等自定义参数。

这里接受传入的参数,来狗仔JsonRelation。
  1.     private[sql] class DefaultSource extends RelationProvider {  
  2.       /** Returns a new base relation with the given parameters. */  
  3.       override def createRelation(  
  4.           sqlContext: SQLContext,  
  5.           parameters: Map[String, String]): BaseRelation = {  
  6.         val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))  
  7.         val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)  
  8.       
  9.         JSONRelation(fileName, samplingRatio)(sqlContext)  
  10.       }  
  11.     }  
复制代码

六、总结
  External DataSource源码分析下来,可以总结为3部分。
  1、外部数据源的注册流程
  2、外部数据源Table查询的计划解析流程
  3、如何自定义一个外部数据源,重写BaseRelation定义外部数据源的schema和scan的规则。定义RelationProvider,如何生成外部数据源Relation。

  External Datasource此部分API还有可能在后续的build中改动,目前只是涉及到了查询,关于其它的操作还未涉及。

资料来源:http://blog.csdn.net/oopsoom/article/details/42064075


欢迎加入about云群371358502、39327136,云计算爱好者群,亦可关注about云腾讯认证空间||关注本站微信

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条