分享

spark bulkload hbase 代码已写完,字典排序遇到问题了。

大家好,代码写完了。遇到:Caused by: java.io.IOException: Added a key not lexically larger than previous. Current cell = 0|000024481100|10_000024481100|10_00000185_020160907135900/f:AGE/1519957986116/Put/vlen=1/seqid=0, lastCell = 0|000024481100|10_000024481100|10_00000185_020160907135900/f:WARD_NAME/1519957986116/Put/vlen=8/seqid=0  这个老生常谈的问题了。
说是rowkey?kv没排序好,但是不知道怎么排序啊,我都排了好多次了。。

下面是我的代码,有经验的请指教 不甚感激

object BulkToHBase {
  def main(args: Array[String]): Unit = {

    val xs: Class[ImmutableBytesWritable] = classOf[ImmutableBytesWritable]
    val classess: Array[Class[_]] = Array(classOf[ImmutableBytesWritable], classOf[KeyValue],
      classOf[Put], classOf[ImmutableBytesWritable.Comparator])
    val sparkConf = new SparkConf().setAppName("BulkToHBase").setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.kryo.referenceTracking", "false")
      .registerKryoClasses(classess)
    val spark = SparkSession.builder.config(sparkConf).getOrCreate
    spark.sparkContext.setLogLevel("warn")

    val connectionProperties = new Properties()
    connectionProperties.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    val predicates = Array[String]("RIGHT(EXAM_RESULT_ID,1) = '0' ",
      "RIGHT(EXAM_RESULT_ID,1) = '1' ",
      "RIGHT(EXAM_RESULT_ID,1) = '2' ",
      "RIGHT(EXAM_RESULT_ID,1) = '3' ",
      "RIGHT(EXAM_RESULT_ID,1) = '4' ",
      "RIGHT(EXAM_RESULT_ID,1) = '5' ",
      "RIGHT(EXAM_RESULT_ID,1) = '6' ",
      "RIGHT(EXAM_RESULT_ID,1) = '7' ",
      "RIGHT(EXAM_RESULT_ID,1) = '8' ",
      "RIGHT(EXAM_RESULT_ID,1) = '9' ")
    val url = "jdbc:sqlserver://192.168.1.21;username=sa;password=yishidb;database=CDRDB16;useUnicode=true&characterEncoding=UTF-8"
    val tableName = "DC_EXAM_RESULT"
    val dataSet = spark.read.jdbc(url, tableName, predicates, connectionProperties)
    val cols: Array[String] = dataSet.columns.sorted
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "master,slave01,slave02")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    val tablename = "DC_EXAM_RESULT1"
    val table = new HTable(conf, tablename)
    conf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    lazy val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat2.configureIncrementalLoad(job, table)

    val filterdDataset = dataSet.filter(x => x.getAs("PATIENT_ID") != null && x.getAs("EXAM_RESULT_ID") != null && x.getAs("ENCOUNTER_ID") != null)

    val value = filterdDataset.rdd
      .flatMap(x => {
        val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
        val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString + "|" +
          x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString()
        val arrayBuffer = new ArrayBuffer[(ImmutableBytesWritable, KeyValue)]
        for (i <- 0 until cols.length) yield {
          if (x.getAs(i) != null) {
            val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("f"),
              Bytes.toBytes(cols(i)), Bytes.toBytes(x.getAs(i).toString))
            arrayBuffer.+=((new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv))
            (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv)
          }
        }
        arrayBuffer
      }
      )
    value.persist(StorageLevel.MEMORY_AND_DISK)
   val sortedBykey = value.sortByKey()

    sortedBykey.saveAsNewAPIHadoopFile("/tmp/data2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration());
    val bulkLoader = new LoadIncrementalHFiles(conf)
    bulkLoader.doBulkLoad(new Path("/tmp/data2"), table)
    spark.stop()
  }
}

已有(15)人评论

跳转到指定楼层
langke93 发表于 2018-3-2 13:21:18
我记得好像是放到keyvalue对象里,其实已经排好序了。不需要再排序了
   val sortedBykey = value.sortByKey()

把上面去掉试试
回复

使用道具 举报

remarkzhao 发表于 2018-3-2 14:36:12
langke93 发表于 2018-3-2 13:21
我记得好像是放到keyvalue对象里,其实已经排好序了。不需要再排序了
   val sortedBykey = value.sortByK ...

不行哦,还是同样的错误。应该是对rowkey,列族,列 的排序。。keyvalue之后应该要再排序,但是不会排
回复

使用道具 举报

hello2019 发表于 2018-3-2 16:52:24
本帖最后由 hello2019 于 2018-3-2 16:55 编辑
remarkzhao 发表于 2018-3-2 14:36
不行哦,还是同样的错误。应该是对rowkey,列族,列 的排序。。keyvalue之后应该要再排序,但是不会排

arrayBuffer排下序arrayBuffer.sorted  

同时下面排下序
val sort = value.sortBy(_._1)  
回复

使用道具 举报

remarkzhao 发表于 2018-3-2 16:56:27
本帖最后由 remarkzhao 于 2018-3-2 17:01 编辑
hello2019 发表于 2018-3-2 16:52
arrayBuffer排下序arrayBuffer.sorted  

同时下面排下序

额。这个  value不就是arraryBuffer的返回吗。排的话应该怎么排   

在返回那里 arraryBuffer.sorted?

是这样吗?


val value = filterdDataset.rdd
      .flatMap(x => {
        val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
        val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString + "|" +
          x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString()
        val arrayBuffer = new ArrayBuffer[(ImmutableBytesWritable, KeyValue)]
        for (i <- 0 until cols.length) yield {
          if (x.getAs(i) != null) {
            val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("f"),
              Bytes.toBytes(cols(i)), Bytes.toBytes(x.getAs(i).toString))
            arrayBuffer.+=((new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv))
            (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv)
          }
        }
        arrayBuffer.sorted
      }
      )
    value.persist(StorageLevel.MEMORY_AND_DISK)
   val sortedBykey = value.sortBy(_._1)  

    sortedBykey.saveAsNewAPIHadoopFile("/tmp/data2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration());
    val bulkLoader = new LoadIncrementalHFiles(conf)
    bulkLoader.doBulkLoad(new Path("/tmp/data2"), table)
    spark.stop()
  }
}

回复

使用道具 举报

hello2019 发表于 2018-3-2 17:53:02
本帖最后由 hello2019 于 2018-3-2 17:54 编辑
remarkzhao 发表于 2018-3-2 16:56
额。这个  value不就是arraryBuffer的返回吗。排的话应该怎么排   

在返回那里 arraryBuffer.sorted? ...

下面这样
val value = filterdDataset.rdd
      .flatMap(x => {
        val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
        val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString + "|" +
          x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString()
        val arrayBuffer = new ArrayBuffer[(ImmutableBytesWritable, KeyValue)]
        for (i <- 0 until cols.length) yield {
          if (x.getAs(i) != null) {
            val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("f"),
              Bytes.toBytes(cols(i)), Bytes.toBytes(x.getAs(i).toString))
            arrayBuffer.+=((new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv))
            (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv)
          }
        }
    val  arrayBufferNew=arrayBuffer.sorted
    arrayBufferNew
      }
      )
    value.persist(StorageLevel.MEMORY_AND_DISK)
   val sortedBykey = value.sortBy(_._1)  

    sortedBykey.saveAsNewAPIHadoopFile("/tmp/data2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration());
    val bulkLoader = new LoadIncrementalHFiles(conf)
    bulkLoader.doBulkLoad(new Path("/tmp/data2"), table)
    spark.stop()
  }
}



回复

使用道具 举报

remarkzhao 发表于 2018-3-2 18:05:02
hello2019 发表于 2018-3-2 17:53
下面这样
val value = filterdDataset.rdd
      .flatMap(x => {

试过了 不行。
回复

使用道具 举报

hello2019 发表于 2018-3-2 18:13:44

还是原先的错误??你把前100行,打印出来看下数据。
看下是数据没有排序成功,还是什么情况。
回复

使用道具 举报

remarkzhao 发表于 2018-3-5 12:04:41
hello2019 发表于 2018-3-2 18:13
还是原先的错误??你把前100行,打印出来看下数据。
看下是数据没有排序成功,还是什么情况。

是的,还是原先的错误,打印不出来。
回复

使用道具 举报

hello2019 发表于 2018-3-7 07:40:15
remarkzhao 发表于 2018-3-5 12:04
是的,还是原先的错误,打印不出来。

val value = filterdDataset.rdd
      .flatMap(x => {
        val partition = StringUtils.leftPad(Integer.toString(Math.abs(x.getAs("PATIENT_ID").hashCode % 10)), 1, '0')
        val rowKey = partition + "|" + x.getAs("PATIENT_ID").toString + "|" +
          x.getAs("ENCOUNTER_ID").toString() + "|" + x.getAs("EXAM_RESULT_ID").toString()
        val arrayBuffer = new ArrayBuffer[(ImmutableBytesWritable, KeyValue)]
        for (i <- 0 until cols.length) yield {
          if (x.getAs(i) != null) {
            val kv: KeyValue = new KeyValue(Bytes.toBytes(rowKey), Bytes.toBytes("f"),
              Bytes.toBytes(cols(i)), Bytes.toBytes(x.getAs(i).toString))
            arrayBuffer.+=((new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv))
            (new ImmutableBytesWritable(Bytes.toBytes(rowKey)), kv)
          }
        }
    val  arrayBufferNew=arrayBuffer.sorted
    arrayBufferNew
      }
      )
    value.persist(StorageLevel.MEMORY_AND_DISK)
   val sortedBykey = value.sortBy(_._1)  
把下面注释掉在打印
  #  sortedBykey.saveAsNewAPIHadoopFile("/tmp/data2", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], #job.getConfiguration());
  #  val bulkLoader = new LoadIncrementalHFiles(conf)
    #bulkLoader.doBulkLoad(new Path("/tmp/data2"), table)
    spark.stop()
  }
}
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条