分享

还是关于导入hbase的问题。。

remarkzhao 发表于 2017-8-28 11:49:13 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 9 5797
本帖最后由 remarkzhao 于 2017-8-28 13:26 编辑

如何从sqlserver里把数据导入到hbase里,这个问题纠结了我一个月,各种方法的取舍,以及判断 还是有点迷糊,目前想尝试以下方法 还是有点问题 望
各位大神指教一下

方案: 从sqlserver里拉去dataframe,再保存为RDD,然后在用RDD导入hbase的api进行hbase的数据导入

目前遇到的问题:

      1. scala代码编写之后 打成jar进行提交的时候出现下标越界异常

      2. 打jar的时候有时候会遇到错误:找不到spark-parent_2.11-2.2.0.jar
         在idea开发的时候错误是这样的:SBT project import: [error] Server access Error: Connection timed out: connect url=http://maven.ibiblio.org/maven2/org/apache/spark/spark-parent_2.11/2.2.0/spark-parent_2.11-2.2.0.jar,但是很奇怪,在linux下直接 sbt/sbt pacage 有时候就可以 有时候就不可以。

      3. 有没有scala的算子直接把sqlserver里拉出的dataframe 保存为rdd  或者拉出来的就是RDD 不是dataframe ,我这个是先dataframe以保存rdd的形式到本地再用textFile拿出来。

代码如下,望各位大神指点:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object ImportToHBase{

   def main(args: Array[String]){

      val conf = HBaseConfiguration.create()
      val sc = new SparkContext(new SparkConf())

     val readFile = sc.textFile("file:///root/tools/spark/mycode/DC_EXAM_RESULT.txt").map(x => x.split(","))  //DC_EXAM_RESULT.txt 是我从sqlserver拉出来dataframe进行 .rdd.saveAsTexFile之后的目录,目录下的问part-00000是csv文件, 以逗号为分隔符
   val tableName = "Document"
   readFile.foreachPartition{
  x=> {
    val myConf = HBaseConfiguration.create()
   myConf.set("hbase.zookeeper.quorum", "hadoop001,hadoop002,hadoop003")
    myConf.set("hbase.zookeeper.property.clientPort", "2181")
    myConf.set("hbase.defaults.for.version.skip", "true")
    val myTable = new HTable(myConf, TableName.valueOf(tableName))
    myTable.setAutoFlush(false, false)
    myTable.setWriteBufferSize(5*1024*1024)
    x.foreach { y => {
      val p = new Put(Bytes.toBytes(y(3)))   // csv的第4列为rowkey
      p.add("Family".getBytes, "qualifier".getBytes, Bytes.toBytes(y(45)))   //csv的第46列为列族的列的内容 ,只有一个列族,一个列。
     myTable.put(p)
    }
    }
    myTable.flushCommits()

    }

   }


  }
}

以下是异常:

  [Stage 0:>                                                         (0 + 1) / 36]15711 [Executor task launch worker for task 0] ERROR org.apache.spark.executor.Executor  - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArrayIndexOutOfBoundsException: 45
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
16114 [task-result-getter-0] ERROR org.apache.spark.scheduler.TaskSetManager  - Task 0 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 45
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
        at ImportToHBase$.main(ImportToHBase.scala:19)
        at ImportToHBase.main(ImportToHBase.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 45
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:30)
        at ImportToHBase$$anonfun$main$1$$anonfun$apply$1.apply(ImportToHBase.scala:28)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:28)
        at ImportToHBase$$anonfun$main$1.apply(ImportToHBase.scala:20)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
16608 [Thread-2] INFO  org.spark_project.jetty.server.AbstractConnector  - Stopped Spark@431a7c26{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}



已有(9)人评论

跳转到指定楼层
sstutu 发表于 2017-8-28 13:21:25
个人观点仅供参考:
1. scala代码编写之后 打成jar进行提交的时候出现下标越界异常
越界,里面没有发现数组,不知道是不是spark里面造成的。对于main函数,这个Array楼主输入的什么。这个也需要看看。
def main(args: Array[String])

2. 打jar的时候有时候会遇到错误:找不到spark-parent_2.11-2.2.0.jar
         在idea开发的时候错误是这样的:SBT project import: [error] Server access Error: Connection timed out: connect url=http://maven.ibiblio.org/maven2/org/apache/spark/spark-parent_2.11/2.2.0/spark-parent_2.11-2.2.0.jar,但是很奇怪,在linux下直接 sbt/sbt pacage 有时候就可以 有时候就不可以。
这个最好下载到本地库,很可能是因为网络的原因造成的

3. 有scala的算子直接把sqlserver里拉出的dataframe 保存为rdd  或者拉出来的就是RDD 不是dataframe (这个问题好像有点low)
楼主的代码应该用的就是rdd,而非DataFrame
推荐参考spark小知识总结
http://www.aboutyun.com/forum.php?mod=viewthread&tid=20920


回复

使用道具 举报

remarkzhao 发表于 2017-8-28 13:30:46
sstutu 发表于 2017-8-28 13:21
个人观点仅供参考:
1. scala代码编写之后 打成jar进行提交的时候出现下标越界异常
越界,里面没有发现 ...

恩恩。明白。。

spark 提交的语句是这句: spark-submit --driver-class-path /root/tools/spark/jars/hbase/*:/root/tools/spark/jars/*:/root/tools/hbase/conf  --class ImportToHBase  /root/tools/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar

异常里提示的下标越界是指我scala代码中写的 28行跟30行有问题。所以应该不会在main函数里有错误。

spark_parent 那个jar我找过maven 跟ali的镜像 居然没找到这个jar包,也是醉了。

我scala代码中用的是rdd ,我原先用scala从sqlserver里拉过来保存到本地的。

无论如何多谢了。

还有大神指点一下吗??求助ING。

回复

使用道具 举报

nextuser 发表于 2017-8-28 13:44:41
remarkzhao 发表于 2017-8-28 13:30
恩恩。明白。。

spark 提交的语句是这句: spark-submit --driver-class-path /root/tools/spark/jars ...

越界的代码行,贴出来看下
回复

使用道具 举报

remarkzhao 发表于 2017-8-28 13:46:18
nextuser 发表于 2017-8-28 13:44
越界的代码行,贴出来看下

恩恩。。。来了。。object ImportToHBase{

   def main(args: Array[String]){

      val conf = HBaseConfiguration.create()
      val sc = new SparkContext(new SparkConf())

     val readFile = sc.textFile("file:///root/tools/spark/mycode/DC_EXAM_RESULT.txt").map(x => x.split(","))  //DC_EXAM_RESULT.txt 是我从sqlserver拉出来dataframe进行 .rdd.saveAsTexFile之后的目录,目录下的问part-00000是csv文件, 以逗号为分隔符
   val tableName = "Document"
   readFile.foreachPartition{
  x=> {
    val myConf = HBaseConfiguration.create()
   myConf.set("hbase.zookeeper.quorum", "hadoop001,hadoop002,hadoop003")
    myConf.set("hbase.zookeeper.property.clientPort", "2181")
    myConf.set("hbase.defaults.for.version.skip", "true")
    val myTable = new HTable(myConf, TableName.valueOf(tableName))
    myTable.setAutoFlush(false, false)
    myTable.setWriteBufferSize(5*1024*1024)
    x.foreach { y => {
      val p = new Put(Bytes.toBytes(y(3)))   // csv的第4列为rowkey
      p.add("Family".getBytes, "qualifier".getBytes, Bytes.toBytes(y(45)))   //csv的第46列为列族的列的内容 ,只有一个列族,一个列。
     myTable.put(p)

    }
    }
    myTable.flushCommits()

    }

   }


  }
}


回复

使用道具 举报

nextuser 发表于 2017-8-28 13:57:48
remarkzhao 发表于 2017-8-28 13:46
恩恩。。。来了。。object ImportToHBase{

   def main(args: Array[String]){

      val p = new Put(Bytes.toBytes(y(3)))   // csv的第4列为rowkey

rowkey 最大为64KB。看看是不有的超过了。
来自:
hbase cell最大字节数多少
http://www.aboutyun.com/forum.php?mod=viewthread&tid=10413



回复

使用道具 举报

remarkzhao 发表于 2017-8-28 14:06:41
nextuser 发表于 2017-8-28 13:57
val p = new Put(Bytes.toBytes(y(3)))   // csv的第4列为rowkey

rowkey 最大为64KB。看看是不 ...

rowkey也没多大啊固定长度跟格式: 10_00048113_020161224150834   

后面那个y(45)  大概这样的: CT头颅,CT头颅,0,放射科,CT头颅,头颅平扫(序列扫描),头颅,头颅,null,两侧大脑半球对称,中线结构居中。左侧基底节可见点状低密度影,两侧侧脑室前角、后角周围脑白质密度减低。脑室系统不大,脑沟、脑裂未见明显增宽。

回复

使用道具 举报

remarkzhao 发表于 2017-8-28 14:16:09
nextuser 发表于 2017-8-28 13:57
val p = new Put(Bytes.toBytes(y(3)))   // csv的第4列为rowkey

rowkey 最大为64KB。看看是不 ...

看过了。
1个数字一个字节, 按照我这个rowkey  顶多 20个字节

y(45) 那个  就算100个汉字,也就200个字节,远没到kb级别。。


现在不知道从哪入手去找错误了。
回复

使用道具 举报

desehawk 发表于 2017-8-28 20:58:10
remarkzhao 发表于 2017-8-28 14:16
看过了。
1个数字一个字节, 按照我这个rowkey  顶多 20个字节

调试下代码,跟踪运行
回复

使用道具 举报

remarkzhao 发表于 2017-8-28 20:59:47
desehawk 发表于 2017-8-28 20:58
调试下代码,跟踪运行

可能找到错误了。保留的时候要成csv格式,读的也时候也要读csv,而不是自己去割文件。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条