应该不成立,因为你使用的是默认的。根本就没有构造catalog。
建议,按部就班的来操作。
首先定义catalog
[mw_shl_code=bash,true]def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin[/mw_shl_code]
在保存DataFrame的时候需要看到,首先是填充数据
[mw_shl_code=bash,true]case class HBaseRecord(
col0: String,
col1: Boolean,
col2: Double,
col3: Float,
col4: Int,
col5: Long,
col6: Short,
col7: String,
col8: Byte)
object HBaseRecord
{
def apply(i: Int, t: String): HBaseRecord = {
val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s,
i % 2 == 0,
i.toDouble,
i.toFloat,
i,
i.toLong,
i.toShort,
s"String$i: $t",
i.toByte)
}
}
val data = (0 to 255).map { i => HBaseRecord(i, "extra")}[/mw_shl_code]
可以看到上面的data其实就是keyvalue集合。
在集合的基础上,在转换为rdd,rdd转换为DataFrame。
[mw_shl_code=bash,true]sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark ")
.save()[/mw_shl_code]
由于你一开始就已经确定了catalog,并且已经转换为dataframe,再次定义catalog,就没有意义了。
|