大家好,我在spark 1.3环境编程时,遇到下述错误原因:
Exception in thread "main" java.lang.NoSuchFieldError: defaultVal
at org.apache.spark.sql.hive.HiveContext$$anonfun$newTemporaryConfiguration$2.apply(HiveContext.scala:613)
at org.apache.spark.sql.hive.HiveContext$$anonfun$newTemporaryConfiguration$2.apply(HiveContext.scala:611)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.sql.hive.HiveContext$.newTemporaryConfiguration(HiveContext.scala:611)
at org.apache.spark.sql.hive.HiveContext.org$apache$spark$sql$hive$HiveContext$$execOverrides$lzycompute(HiveContext.scala:161)
at org.apache.spark.sql.hive.HiveContext.org$apache$spark$sql$hive$HiveContext$$execOverrides(HiveContext.scala:161)
at org.apache.spark.sql.hive.HiveContext.makeDummyHive(HiveContext.scala:203)
at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:183)
at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:181)
at org.apache.spark.sql.hive.HiveContext$$anonfun$session$1.apply(HiveContext.scala:411)
at org.apache.spark.sql.hive.HiveContext$$anonfun$session$1.apply(HiveContext.scala:409)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.session(HiveContext.scala:409)
at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:423)
at org.apache.spark.sql.hive.HiveContext.createSession(HiveContext.scala:474)
at org.apache.spark.sql.hive.HiveContext.createSession(HiveContext.scala:75)
at org.apache.spark.sql.SQLContext.openSession(SQLContext.scala:898)
at org.apache.spark.sql.SQLContext.defaultSession$lzycompute(SQLContext.scala:184)
at org.apache.spark.sql.SQLContext.defaultSession(SQLContext.scala:184)
at org.apache.spark.sql.SQLContext$$anon$3.initialValue(SQLContext.scala:180)
at org.apache.spark.sql.SQLContext$$anon$3.initialValue(SQLContext.scala:179)
at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
at java.lang.ThreadLocal.get(ThreadLocal.java:170)
at org.apache.spark.sql.SQLContext.currentSession(SQLContext.scala:906)
at org.apache.spark.sql.hive.HiveContext.currentSession(HiveContext.scala:483)
at org.apache.spark.sql.hive.HiveContext$$anon$5.conf(HiveContext.scala:448)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.$init$(FunctionRegistry.scala:37)
at org.apache.spark.sql.hive.HiveContext$$anon$5.<init>(HiveContext.scala:447)
at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:447)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:446)
at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:75)
at org.apache.spark.sql.UDFRegistration.<init>(UDFRegistration.scala:41)
at org.apache.spark.sql.SQLContext.<init>(SQLContext.scala:264)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:75)
at org.apache.spark.sql.hive.HiveContext.<init>(HiveContext.scala:81)
at com.adtec.bigdata.sparksql.JavaSparkSqlDemo.main(JavaSparkSqlDemo.java:49)
因为网上关于spark 1.3的资料实在是太少了,请问有谁知道吗?
代码:
package com.adtec.bigdata.sparksql;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.storage.StorageLevel;
import com.adtec.bigdata.bean.CbLogPo;
import com.adtec.bigdata.common.HadoopSecurity;
import scala.Tuple2;
public class JavaSparkSqlDemo {
private static Logger logger = Logger.getLogger(JavaSparkSqlDemo.class);
public static void main(String[] args) {
/*
* if (args.length < 2) { System.err.println(
* "Usage JavaSparkSqlDemo: <master> <tableName>"); System.exit(1); }
*/
try {
/*
* String master = args[0].trim(); final String tableName = args[1];
*/
String master = "local[2]";
final String tableName = "cb_log";
/*
* SparkSession sparkSession =
* SparkSession.builder().appName("JavaSparkSql").master(master).
* getOrCreate();
*/
// .enableHiveSupport().getOrCreate();
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSqlDemo").setMaster(master);
SparkContext sc = new SparkContext(sparkConf);
HiveContext hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
HadoopSecurity hs = HadoopSecurity.getMe();
Configuration conf = hs.login();
DataFrame orders = hiveContext.sql(
"select log_bankid,log_logno,log_cstno,log_userno,log_bsncode,log_datetime,log_flowno,log_result,log_info,log_contextid,log_sid,log_type,log_ip,log_mac,log_timestamp,log_osversion from catm.cb_log");
// 将数据转换为 javapairRDD 对象
JavaPairRDD<String, CbLogPo> jpairorder = orders.toJavaRDD()
.mapToPair(new PairFunction<Row, String, CbLogPo>() {
@Override
public Tuple2<String, CbLogPo> call(Row row) throws Exception {
CbLogPo cbLog = new CbLogPo();
cbLog.setLogBankid(row.getString(0));
cbLog.setLogLogno(row.getString(1));
cbLog.setLogCstno(row.getString(2));
cbLog.setLogUserno(row.getString(3));
cbLog.setLogBsncode(row.getString(4));
cbLog.setLogDatetime(row.getString(5));
cbLog.setLogFlowno(row.getString(6));
cbLog.setLogResult(row.getString(7));
cbLog.setLogInfo(row.getString(8));
cbLog.setLogContextid(row.getString(9));
cbLog.setLogSid(row.getString(10));
cbLog.setLogType(row.getString(11));
cbLog.setLogIp(row.getString(12));
cbLog.setLogMac(row.getString(13));
cbLog.setLogTimestamp(row.getString(14));
cbLog.setLogOsversion(row.getString(15));
return new Tuple2<String, CbLogPo>(row.getString(0), cbLog);
}
});
jpairorder.persist(StorageLevel.MEMORY_AND_DISK_SER());
jpairorder.foreach(new VoidFunction<Tuple2<String, CbLogPo>>() {
@Override
public void call(Tuple2<String, CbLogPo> t) throws Exception {
insertData(t._2, conf, tableName);
}
});
sc.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 插入数据
*
* @throws IOException
*/
public static void insertData(CbLogPo cbLog, Configuration conf, String tableName) throws IOException {
logger.info("start insert data ......");
HTable table = new HTable(conf, tableName);
Put put = new Put((cbLog.getLogLogno() + ":" + cbLog.getLogCstno()).getBytes());
put.addColumn("cf".getBytes(), "logBankid".getBytes(), cbLog.getLogBankid().getBytes());
put.addColumn("cf".getBytes(), "logLogno".getBytes(), cbLog.getLogLogno().getBytes());
put.addColumn("cf".getBytes(), "logCstno".getBytes(), cbLog.getLogCstno().getBytes());
put.addColumn("cf".getBytes(), "logUserno".getBytes(), cbLog.getLogUserno().getBytes());
put.addColumn("cf".getBytes(), "logBsncode".getBytes(), cbLog.getLogBsncode().getBytes());
put.addColumn("cf".getBytes(), "logDatetime".getBytes(), cbLog.getLogDatetime().getBytes());
put.addColumn("cf".getBytes(), "logFlowno".getBytes(), cbLog.getLogFlowno().getBytes());
put.addColumn("cf".getBytes(), "logResult".getBytes(), cbLog.getLogResult().getBytes());
put.addColumn("cf".getBytes(), "logInfo".getBytes(), cbLog.getLogInfo().getBytes());
put.addColumn("cf".getBytes(), "logContextid".getBytes(), cbLog.getLogContextid().getBytes());
put.addColumn("cf".getBytes(), "logSid".getBytes(), cbLog.getLogSid().getBytes());
put.addColumn("cf".getBytes(), "logType".getBytes(), cbLog.getLogType().getBytes());
put.addColumn("cf".getBytes(), "logIp".getBytes(), cbLog.getLogIp().getBytes());
put.addColumn("cf".getBytes(), "logMac".getBytes(), cbLog.getLogMac().getBytes());
put.addColumn("cf".getBytes(), "logTimestamp".getBytes(), cbLog.getLogTimestamp().getBytes());
put.addColumn("cf".getBytes(), "logOsversion".getBytes(), cbLog.getLogOsversion().getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
} finally {
if (null != table) {
table.close();
}
}
logger.info("end insert data ......");
}
}
|