分享

Spark技术实战之3 -- 利用Spark将json文件导入Cassandra

xioaxu790 2014-10-19 11:42:36 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 18619
问题导读
1、利用Spark将json文件导入Cassandra的前提条件有哪些?
2、本次实验需要结合哪些知识?
3、进行数据导入时,需要注意什么?




本文接上一篇:Spark技术实战之2 -- Spark Cassandra Connector的安装和使用

概要
本文简要介绍如何使用spark-cassandra-connector将json文件导入到cassandra数据库,这是一个使用spark的综合性示例。


前提条件
假设已经阅读技术实战之3,并安装了如下软件
jdk
scala
sbt
cassandra
spark-cassandra-connector

实验目的
将存在于json文件中的数据导入到cassandra数据库,目前由cassandra提供的官方工具是json2sstable,由于对cassandra本身了解不多,这个我还没有尝试成功。

但想到spark sql中可以读取json文件,而spark-cassadra-connector又提供了将RDD存入到数据库的功能,我想是否可以将两者结合一下。

创建KeySpace和Table
为了减少复杂性,继续使用实战3中的keyspace和table,
  1. CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
  2. CREATE TABLE test.kv(key text PRIMARY KEY, value int);
复制代码


启动spark-shell
与实战3中描述一致。
  1. bin/spark-shell --driver-class-path /root/working/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.1.0-SNAPSHOT.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-thrift/jars/cassandra-thrift-2.0.9.jar:/root/.ivy2/cache/org.apache.thrift/libthrift/jars/libthrift-0.9.1.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-clientutil/jars/cassandra-clientutil-2.0.9.jar:/root/.ivy2/cache/com.datastax.cassandra/cassandra-driver-core/jars/cassandra-driver-core-2.0.4.jar:/root/.ivy2/cache/io.netty/netty/bundles/netty-3.9.0.Final.jar:/root/.ivy2/cache/com.codahale.metrics/metrics-core/bundles/metrics-core-3.0.2.jar:/root/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.7.jar:/root/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.3.2.jar:/root/.ivy2/cache/org.joda/joda-convert/jars/joda-convert-1.2.jar:/root/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.3.jar:/root/.ivy2/cache/org.apache.cassandra/cassandra-all/jars/cassandra-all-2.0.9.jar:/root/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.2.jar
复制代码


准备json文件
以spark自带的person.json文件为例,内容如下所示
  1. {"name":"Andy", "age":30}
  2. {"name":"Justin", "age":19}
复制代码



数据导入
假设person.json文件存储在$SPARK_HOME目录,在启动spark-shell之后,执行如下语句
  1. sc.stop
  2. import com.datastax.spark.connector._
  3. import org.apache.spark._
  4. val conf = new SparkConf()
  5. conf.set("spark.cassandra.connection.host", "127.0.0.1")
  6. val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
  7. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  8. val path = "./people.json"
  9. val people = sqlContext.jsonFile(path)
  10. people.map(p=>(p.getString(10),p.getInt(0)))
  11.       .saveToCassandra("test","kv",SomeColumns("key","value"))
复制代码



注意:
1、jsonFile返回的是jsonRDD,其中每一个成员是Row类型,并不行直接将saveToCassandra作用于jsonRDD,需要先作一步转换即map过程
2、map中使用到的getXXX函数是在事先已知数据类型的情况下取出其值
3、最后saveToCassandra触发数据的存储过程
另外一个地方值得记录一下,如果在cassandra中创建的表使用了uuid作为primary key,在scala中使用如下函数来生成uuid
  1. import java.util.UUID
  2. UUID.randomUUID
复制代码



验证步骤
使用cqlsh来查看数据是否已经真正的写入到test.kv表中。

小结
本次实验结合了以下知识
spark sql
spark RDD的转换函数
spark-cassandra-connector

相关文章

Spark技术实战之1 -- KafkaWordCount
http://www.aboutyun.com/thread-9580-1-1.html

Spark技术实战之2 -- Spark Cassandra Connector的安装和使用
http://www.aboutyun.com/thread-9582-1-1.html


Apache Spark技术实战之4 -- SparkR的安装及使用
http://www.aboutyun.com/thread-10082-1-1.html

Apache Spark技术实战之5 -- spark-submit常见问题及其解决
http://www.aboutyun.com/thread-10083-1-1.html

Apache Spark技术实战之6 -- CassandraRDD高并发数据读取实现剖析
http://www.aboutyun.com/thread-10084-1-1.html




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条