分享

回归本质……sqlserver拉数据到hbase

remarkzhao 发表于 2017-7-28 09:23:58 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 15 9207
langke93 发表于 2017-7-28 14:15:52
remarkzhao 发表于 2017-7-28 13:14
好啊。我现在有2个问题:

1. 我的dataframe是直接从sqlserver去拉 不用自己构造数据   这样就可以

应该不成立,因为你使用的是默认的。根本就没有构造catalog。
建议,按部就班的来操作。
首先定义catalog

[mw_shl_code=bash,true]def catalog = s"""{
       |"table":{"namespace":"default", "name":"table1"},
       |"rowkey":"key",
       |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
         |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
         |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
         |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
         |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
         |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
         |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
         |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
       |}
     |}""".stripMargin[/mw_shl_code]

在保存DataFrame的时候需要看到,首先是填充数据
[mw_shl_code=bash,true]case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,      
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{                                                                                                            
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""      
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}[/mw_shl_code]
可以看到上面的data其实就是keyvalue集合。

在集合的基础上,在转换为rdd,rdd转换为DataFrame。
[mw_shl_code=bash,true]sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()[/mw_shl_code]
由于你一开始就已经确定了catalog,并且已经转换为dataframe,再次定义catalog,就没有意义了。



回复

使用道具 举报

remarkzhao 发表于 2017-7-28 14:23:15
langke93 发表于 2017-7-28 14:15
应该不成立,因为你使用的是默认的。根本就没有构造catalog。
建议,按部就班的来操作。
首先定义catal ...

那这么说的话,我就直接去保存就可以了?jdbcDF.write.options(Map(HBaseTableCatalog.newTable -> "5")).format("org.apache.hadoop.hbase.spark ").save()。。。

回复

使用道具 举报

langke93 发表于 2017-7-28 14:34:23
remarkzhao 发表于 2017-7-28 14:23
那这么说的话,我就直接去保存就可以了?jdbcDF.write.options(Map(HBaseTableCatalog.newTable -> "5")) ...

你可以试试,不知道他们是否认识你直接创建的DataFrame。如果不行的话,那就按部就班的来。
回复

使用道具 举报

remarkzhao 发表于 2017-7-28 15:15:27
langke93 发表于 2017-7-28 14:34
你可以试试,不知道他们是否认识你直接创建的DataFrame。如果不行的话,那就按部就班的来。

试了两款 都不行。。

1.  scala> jdbcDF.write.format("org.apache.hadoop.hbase.spark").mode("overwrite").save()
java.lang.AbstractMethodError: org.apache.hadoop.hbase.spark.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)


2. scala> jdbcDF.write.options(Map("table" -> "TestSparkHBase","zkUrl" -> "hadoop001:2181")).format("org.apache.hadoop.hbase.spark").mode("overwrite").save()
java.lang.AbstractMethodError: org.apache.hadoop.hbase.spark.DefaultSource.createRelation(Lorg/apache/spark/sql/SQLContext;Lorg/apache/spark/sql/SaveMode;Lscala/collection/immutable/Map;Lorg/apache/spark/sql/Dataset;)Lorg/apache/spark/sql/sources/BaseRelation;


回复

使用道具 举报

langke93 发表于 2017-7-28 19:38:41
remarkzhao 发表于 2017-7-28 15:15
试了两款 都不行。。

1.  scala> jdbcDF.write.format("org.apache.hadoop.hbase.spark").mode("overw ...

不行,你就按部就班的来。首先把它给你的例子做出来。
然后在把数据从mysql里,按照例子的方式拉出来。
这里面的关键是怎么填充数据。
[mw_shl_code=bash,true]case class HBaseRecord(
   col0: String,
   col1: Boolean,
   col2: Double,
   col3: Float,
   col4: Int,      
   col5: Long,
   col6: Short,
   col7: String,
   col8: Byte)

object HBaseRecord
{                                                                                                            
   def apply(i: Int, t: String): HBaseRecord = {
      val s = s"""row${"%03d".format(i)}"""      
      HBaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,  
      i,
      i.toLong,
      i.toShort,  
      s"String$i: $t",      
      i.toByte)
  }
}

val data = (0 to 255).map { i =>  HBaseRecord(i, "extra")}[/mw_shl_code]

val data = (0 to 255).map { i => HBaseRecord(i, "extra")
看到这个吧,这个是有256条记录。
这里他很有可能是省略了数据,全部填充为extra,所以先把这个例子解决了。然后在捣鼓你的mysql。

回复

使用道具 举报

remarkzhao 发表于 2017-7-30 17:09:13
langke93 发表于 2017-7-28 19:38
不行,你就按部就班的来。首先把它给你的例子做出来。
然后在把数据从mysql里,按照例子的方式拉出来。
...
嗯。先看看。。
回复

使用道具 举报

12
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条