分享

Spark SQL UDF如何使用。步骤介绍

问题导读
1、什么是Uer Define Function功能?
2、如何在Spark SQL 里自定义实际需要的UDF来处理数据?
3、如何测试concat函数?





Spark1.1推出了Uer Define Function功能,用户可以在Spark SQL 里自定义实际需要的UDF来处理数据。
    因为目前Spark SQL本身支持的函数有限,一些常用的函数都没有,比如len, concat...etc 但是使用UDF来自己实现根据业务需要的功能是非常方便的。
   Spark SQL UDF其实是一个Scala函数,被catalyst封装成一个Expression结点,最后通过eval方法计根据当前Row计算UDF的结果,源码分析见:Spark SQL Catalyst源码分析之UDF

   Spark SQL UDF使用起来非常方便,分2个步骤:

   一、注册
     当我们导入了SQLContext或者HiveContext,即有注册UDF的功能。
  1.    registerFunction(udfName : String, func : FunctionN)
复制代码

    由于scala语言的限制,这里UDF的参数仅支持22个。

   二、使用
  1. select udfName(param1, param2....) from tableName
复制代码



   三、示例
我们这里创建2张表:
第一张dual会从README.md读取记录,里面仅有一个字段line : String
第二张表src,有2个字段key,value,数据是spark sql自带的测试数据。

我们使用 sbt/sbt hive/console进入测试环境:

1、字符串取长度 len()
创建table dual:
  1. scala> sql("create table dual(line string)").collect()  
  2. 14/09/19 17:41:34 INFO metastore.HiveMetaStore: 0: create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:line, type:string, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))  
  3. 14/09/19 17:41:34 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr      cmd=create_table: Table(tableName:dual, dbName:default, owner:root, createTime:1411119694, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:line, type:string, comment:null)], location:null, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE, privileges:PrincipalPrivilegeSet(userPrivileges:null, groupPrivileges:null, rolePrivileges:null))  
复制代码


载入README.md数据:
  1. sql("load data local inpath 'README.md' into table dual ").collect()  
复制代码


  1. scala> sql("select * from dual").collect()  
  2. res4: Array[org.apache.spark.sql.Row] = Array([# Apache Spark], [], [Spark is a fast and general cluster computing system for Big Data. It provides], [high-level APIs in Scala, Java, and Python, and an optimized engine that], [supports general computation graphs for data analysis. It also supports a], [rich set of higher-level tools including Spark SQL for SQL and structured], [data processing, MLLib for machine learning, GraphX for graph processing,], [and Spark Streaming.], [], [<http://spark.apache.org/>], [], [], [## Online Documentation], [], [You can find the latest Spark documentation, including a programming], [guide, on the project webpage at <http://spark.apache.org/documentation.html>.], [This README file only contains basic setup instructions.], [], [## Building Spark], [], ...  
复制代码


编写len函数并,注册函数:
  1. scala> registerFunction("len",(x:String)=>x.length)  
复制代码



测试:
  1. scala> sql("select len(line) from dual").collect()  
  2. 14/09/19 17:45:07 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.072239295 s  
  3. res6: Array[org.apache.spark.sql.Row] = Array([14], [0], [78], [72], [73], [73], [73], [20], [0], [26], [0], [0], [23], [0], [68], [78], [56], [0], [17], [0], [75], [0], [22], [0], [67], [0], [26], [0], [64], [0], [21], [0], [52], [0], [44], [0], [27], [0], [66], [0], [17], [4], [61], [0], [43], [0], [19], [0], [74], [74], [0], [29], [0], [32], [0], [75], [63], [67], [74], [72], [22], [0], [54], [0], [69], [0], [16], [0], [84], [17], [0], [19], [0], [31], [0], [77], [76], [77], [77], [0], [67], [27], [0], [25], [45], [0], [42], [58], [0], [91], [29], [0], [31], [58], [0], [42], [61], [0], [35], [52], [0], [77], [79], [74], [22], [0], [51], [0], [90], [0], [16], [42], [44], [30], [17], [0], [0], [56], [0], [46], [86], [78], [0], [30], [0], [16], [0], [97], [70], [0], [0], [24], [0], [78]...  
复制代码



2、字符串连接concat_str
这里为了简单起见,就根据src表的key value类型 Int, String来做例子:
  1. scala> sql("desc src").collect()  
  2. res8: Array[org.apache.spark.sql.Row] = Array([key,int,null], [value,string,null])  
  3. [java] view plaincopy
  4. scala> sql("select * from src limit 10").collect()  
  5. res7: Array[org.apache.spark.sql.Row] = Array([238,val_238], [86,val_86], [311,val_311], [27,val_27], [165,val_165], [409,val_409], [255,val_255], [278,val_278], [98,val_98], [484,val_484])  
复制代码



编写并注册concat_str函数:
  1. scala> registerFunction("concat_str",(a:Int, b:String)=>a.toString+b)  
复制代码



测试concat函数
  1. scala> sql("select concat_str(key,value) from src ").collect()  
复制代码

  1. 14/09/19 18:17:22 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:85, took 0.082076377 s  
  2. res28: Array[org.apache.spark.sql.Row
复制代码






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条