分享

spark 1.3.0 将dataframe数据写入Hive分区表

pig2 2015-4-6 00:12:46 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 7 184453

问题导读

1.将DataFrame数据如何写入到Hive表中?
2.通过那个API实现创建spark临时表?
3.如何将DataFrame数据写入hive指定数据表的分区中?





从spark1.2 到spark1.3,spark SQL中的SchemaRDD变为了DataFrame,DataFrame相对于SchemaRDD有了较大改变,同时提供了更多好用且方便的API。


DataFrame将数据写入hive中时,默认的是hive默认数据库,insertInto没有指定数据库的参数,本文使用了下面方式将数据写入hive表或者hive表的分区中,仅供参考。

1、将DataFrame数据写入到Hive表中

从DataFrame类中可以看到与hive表有关的写入Api有以下几个:

registerTempTable(tableName: String): Unit,
insertInto(tableName: String): Unit

insertInto(tableName: String, overwrite: Boolean): Unit
saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit



有很多重载函数,不一一列举
registerTempTable函数是创建spark临时表

insertInto函数是向表中写入数据,可以看出此函数不能指定数据库和分区等信息,不可以直接进行写入。

向hive数据仓库写入数据必须指定数据库,hive数据表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")
下面语句是向指定数据库数据表中写入数据:
  1. case class Person(name:String,col1:Int,col2:String)
  2. val sc = new org.apache.spark.SparkContext   
  3. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  4. import hiveContext.implicits._
  5. hiveContext.sql("use DataBaseName")
  6. val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
  7. data.toDF()insertInto("tableName")
复制代码

创建一个case类将RDD中数据类型转为case类类型,然后通过toDF转换为DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName")语句,就可以将DataFrame数据写入hive数据表中了


2、将DataFrame数据写入hive指定数据表的分区中

hive数据表建立可以在hive上建立,或者使用hiveContext.sql(“create table ...."),使用saveAsTable时数据存储格式有限,默认格式为parquet,可以指定为json,如果有其他格式指定,尽量使用语句来建立hive表。

将数据写入分区表的思路是:首先将DataFrame数据写入临时表,之后是由hiveContext.sql语句将数据写入hive分区表中。具体操作如下:
  1. case class Person(name:String,col1:Int,col2:String)
  2. val sc = new org.apache.spark.SparkContext   
  3. val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  4. import hiveContext.implicits._
  5. hiveContext.sql("use DataBaseName")
  6. val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
  7. data.toDF().registerTempTable("table1")
  8. hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")
复制代码

使用以上方式就可以将dataframe数据写入hive分区表了


已有(7)人评论

跳转到指定楼层
feng01301218 发表于 2015-4-6 13:13:12
回复

使用道具 举报

Yuan 发表于 2016-2-29 22:30:15
你好,DataFrame数据写入hive指定数据表的分区中时产生大量小文件该怎么解决
回复

使用道具 举报

linleran 发表于 2016-3-7 12:53:00
Yuan 发表于 2016-2-29 22:30
你好,DataFrame数据写入hive指定数据表的分区中时产生大量小文件该怎么解决

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
回复

使用道具 举报

lbydd 发表于 2016-3-12 16:49:48
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF()insertInto("tableName")
我建立了表,可是还是现实找不到表。使用hiveContext.sql("show tables")也能看到表。可是就是无法添加进去,总是找不到表
py4j.protocol.Py4JJavaError: An error occurred while calling o71.insertInto.
: java.lang.RuntimeException: Table Not Found: allpaper
回复

使用道具 举报

jiameier91 发表于 2017-1-23 20:49:28
求解,将DataFrame数据写入hive指定数据表的分区中。数据量太大的时候一直,报下面的错怎么办?

  • File "/home/work/tools/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/context.py", line 552, in sql
  •   6   File "/home/work/tools/spark-1.5.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  •   7   File "/home/work/tools/spark-1.5.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  •   8   File "/home/work/tools/spark-1.5.0-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
  •   9 py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.

回复

使用道具 举报

easthome001 发表于 2017-1-23 21:16:39
jiameier91 发表于 2017-1-23 20:49
求解,将DataFrame数据写入hive指定数据表的分区中。数据量太大的时候一直,报下面的错怎么办?

  9 py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
详细看看这个 o58.sql的执行。

回复

使用道具 举报

jiameier91 发表于 2017-1-23 21:30:38
easthome001 发表于 2017-1-23 21:16
9 py4j.protocol.Py4JJavaError: An error occurred while calling o58.sql.
详细看看这个 o58.sql的 ...

在哪里看
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条