分享

SparkStreaming写入Hbase遇到包问题,跪求各位大神帮忙

wangweislk 发表于 2015-11-18 12:13:33 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 7 19968
在使用SparkStreaming写入Hbase时,出现下面的错误。
QQ图片20151118120930.png

QQ图片20151118120924.png
提交命令:
spark-submit\
--master yarn-client \
--driver-memory 10g \
--executor-memory 20g \
--num-executors 10 \
--executor-cores 6 \
--jars /data1/bbdhadoop/bbdhadoop/jars/stanford-corenlp-3.4.1.jar,/data1/bbdhadoop/bbdhadoop/jars/spark-streaming-kafka_2.10-1.3.0.jar,/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/htrace-core-3.1.0-incubating.jar \
--class com.bbd.test.Test /data1/bbdhadoop/wangwei/glf/testhbase2.jar c2namenode3 test_kafka_streaming test 1 \



mapToPair.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
                        @Override
                        public Void call(JavaPairRDD<String, Integer> values,Time time) throws Exception {

                                values.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                                        @Override
                                        public void call(Tuple2<String, Integer> tuple)
                                                        throws Exception {
                                                Configuration conf = HBaseConfiguration.create();
                                                conf.set("hbase.zookeeper.quorum", "c2namenode3,c2namenode2,c2namenode1");
                                                HTable table = new HTable(conf, "hbase_test".getBytes());
//                                                HTableInterface table = hTablePool.getTable("hbase_test");
                                                Put put = new Put(tuple._1().getBytes());
                                                put.add("cf".getBytes(), "val".getBytes(), (tuple._2+"").getBytes());
                                                table.put(put);
                                                table.close();
                                        }
                                });

                                return null;
                        }
                });
                mapToPair.print();
               
                jssc.start();
                jssc.awaitTermination();



已有(7)人评论

跳转到指定楼层
bioger_hit 发表于 2015-11-18 13:40:20
楼主

spark-submit\
--master yarn-client \
--driver-memory 10g \
--executor-memory 20g \
--num-executors 10 \
--executor-cores 6 \

master yarn-client而不是yarn-cluster


对hbase操作,我一般用下面代码
                conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum", "master");// 使用eclipse时必须添加这个,否则无法定位master需要配置hosts
                conf.set("hbase.zookeeper.property.clientPort", "2181");
回复

使用道具 举报

wangweislk 发表于 2015-11-18 13:59:07

yarn-client和yarn-cluster我都试过的。对于加端口,我再直接测试hbase的时指定zk就行,我试试加上端口
回复

使用道具 举报

wangweislk 发表于 2015-11-18 15:24:08

我使用官方给的Python例子也是异常的问题。
if __name__ == "__main__":
    if len(sys.argv) != 7:
        print("""
        Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value>
        Run with example jar:
        ./bin/spark-submit --driver-class-path /path/to/example/jar \
        /path/to/examples/hbase_outputformat.py <args>
        Assumes you have created <table> with column family <family> in HBase
        running on <host> already
        """, file=sys.stderr)
        exit(-1)
    host = sys.argv[1]
    table = sys.argv[2]
    sc = SparkContext(appName="HBaseOutputFormat")
    conf = {"hbase.zookeeper.quorum": host,
            "hbase.mapred.outputtable": table,
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset(
        conf=conf,
        keyConverter=keyConv,
        valueConverter=valueConv)
    sc.stop()

QQ截图20151118152351.png
回复

使用道具 举报

bioger_hit 发表于 2015-11-18 18:42:40
wangweislk 发表于 2015-11-18 15:24
我使用官方给的Python例子也是异常的问题。

集群正常吗?是按照正常的方式提交集群
回复

使用道具 举报

wangweislk 发表于 2015-11-18 20:04:09
bioger_hit 发表于 2015-11-18 18:42
集群正常吗?是按照正常的方式提交集群

集群OK的
回复

使用道具 举报

wangweislk 发表于 2015-11-19 09:29:28
bioger_hit 发表于 2015-11-18 18:42
集群正常吗?是按照正常的方式提交集群

我已经使用官方给的例子用Java和Python都测试过了,都是找不到org.apache.htrace.Trace包,要怎么将这个加到classpath,这个类的包和org.cloudera.htrace是不是有冲突
回复

使用道具 举报

wangweislk 发表于 2015-11-19 14:33:55
整了几天终于OK了,尼玛才是环境变量问题。
shell:
HBASE_HOME=/opt/cloudera/parcels/CDH/lib/hbase
#HIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive
export SPARK_CLASSPATH="$HBASE_HOME/conf/:$HBASE_HOME/hbase-client.jar:$HBASE_HOME/hbase-protocol.jar:$HBASE_HOME/lib/htrace-core.jar:$HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar"&&
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条