分享

Spark插件hbase-rdd二次开发

regan 2016-7-29 15:07:14 发表于 入门帮助 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 6934
本帖最后由 regan 于 2016-9-9 09:36 编辑

hbase-rdd二次开发


  • 创建: 张敏,最新修改: 昨天3:41 下午


hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前最新版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法
implicit def stringToBytes(s: String): Array[Byte] = {
  Bytes.toBytes(s)
}
将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。

在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方的话,我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,因此需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。
在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们进行去下改造,不使用stringToBytes隐式方法,直接生成字节数据。
protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = {
  val strs = id.split(",")
  val imei = strs {0}
  val dateTime = strs {1}
  val b1 = MD5Utils.computeMD5Hash(imei.getBytes())
  val b2 = Bytes.toBytes(dateTime.toLong)
  val key = b1.++(b2)
  val p = new Put(key)//改造
  var empty = true
for {
    (family, content) <- values
    (key, value) <- content
  } {
    empty = false
if (StrUtils.isNotEmpty(family) && StrUtils.isNotEmpty(key)) {
      put(p, family, key, value)
    }
  }

  if (empty) None else Some(new ImmutableBytesWritable, p)
}

这样就实现了使用自己的方式构建rowkey,我们可以使用任意的方式构建rowkey了。

在使用hbase-rdd的过程中,也曾今问过自己,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包 之后,RDD之上就有toHbase方法了?
经过查看源码,发现hbase-rdd包中提供了两个隐式方法:
implicit def toHBaseRDDSimple[A](rdd: RDD[(String, Map[String, A])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[A] =
  new HBaseWriteRDDSimple(rdd, pa[A])

implicit def toHBaseRDDSimpleTS[A](rdd: RDD[(String, Map[String, (A, Long)])])(implicit writer: Writes[A]): HBaseWriteRDDSimple[(A, Long)] =
  new HBaseWriteRDDSimple(rdd, pa[A])

这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,调用之后将会在新建HBaseWriteRDDSimple对象,而在HBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。
希望对大家以后的开发由帮助。同时借鉴hbase-rdd的构架方式,在Spark之上构建自己的小模块。



已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条