5.1 动机
数据量可能大到无法放在一台机器中,这时就需要探索别的数据读取和保存的方法了
spark基于hadoop生态圈构建,因此可以通过Hadoop MapReduce所使用的InputFormat和OutPutFromat接口访问如下文件格式及存储系统:
* 文件格式与文件系统
文本文件、JSON、SequenceFile、protocal buffer
NFS、HDFS、Amazon S3等分布式文件系统
* spark SQL中结构化数据源
JSON、Hive
* 数据库与键值存储
Cassandra、Hbase、JDBC源
5.2 文件格式
Spark支持的一些常见格式:
格式名称 结构化 备注
文本文件 否 普通的文本文件,每行一条记录
JSON 半结构化 常见的基于文本的格式,半结构化;大多数库都要求每行一条记录
CSV 是 非常常见的基于文本的格式,通常在电子表格应用中使用
SequenceFiles 是 一种用于键值对数据的常见Hadoop 文件格式
Protocol buffers 是 一种快速、节约空间的跨语言格式
对象文件 是 用来将Spark 作业中的数据存储下来以让共享的代码读取。改变类的时候它会失效,因为它依赖于Java 序列化
5.2.1 文本文件
5.2.1.1 读取文本文件
文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素。
也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。
读取一个文本文件:val input = sc.textFile("文件名")
读取多个文本文件:
方法一:val input = sc.textFile("目录名而非文件名") //这里会把目录中的所有文件读入到RDD中
方法二:val input = sc.wholeTextFiles("目录名") //该方法会返回一个pair RDD,其中键是输入文件的文件名
举例说明之:
(1)准备数据源
hadoop@ubuntu1:~/userdata/files$ cat /home/hadoop/userdata/files/people1.txt
Michael,29
Andy,30
Justin,19
hadoop@ubuntu1:~/userdata/files$ cat /home/hadoop/userdata/files/people2.txt
Tom,10
Tonny,11
Bob,12
(2) textFile
找出两个文件中年龄最大的people
val input = sc.textFile("file:///home/hadoop/userdata/files/")
val result1 = input.map{x => x.split(",")(1).toInt}.max
val result2 = result1.filter(x => x._2 == result1)
分别找出两个文件中年龄最大的people
val input = sc.wholeTextFiles("file:///home/hadoop/userdata/files/")
scala> input.keys.collect //文件名作为key值
res0: Array[String] = Array(file:/home/hadoop/userdata/files/people1.txt, file:/home/hadoop/userdata/files/people2.txt)
scala> input.values.collect //文件内容作为values
res1: Array[String] = Array("Michael,29
Andy,30
Justin,19
", "Tom,10
Tonny,11
Bob,12
")
scala> input.mapValues(x => x.split("\n")).flatMapValues(x => x).mapValues(x => x.split(",")).mapValues(x => x(1).toInt).reduceByKey((x,y) => (if(x < y) y else x)).collect
res41: Array[(String, Int)] = Array((file:/home/hadoop/userdata/files/people1.txt,30), (file:/home/hadoop/userdata/files/people2.txt,12)) //分别找了两个文件中的最大值。
5.2.1.2. 保存文本文件
result.saveAsTextFile(outputFile)
5.2.2 JSON
JSON 是一种使用较广的半结构化数据格式。
读取JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用JSON 解析器来对RDD 中的值进行映射操作。
另外也可以用JSON 序列化库来将数据转为字符串,然后将其写出去。
5.2.2.1. 读取JSON
将数据作为文本文件读取,然后对JSON 数据进行解析。
书中说道能够使用spark进行json 的读写,但是给出的例子使用的json转换的jar包是jackson的,还需要再次进行引入
下面则是一个spark内部已经有的基于scala的json处理的jar包 -- json4s包
下面引用的一个例子来自网页:http://blog.csdn.net/wild46cat/article/details/54174695
json文件内容如下:
hadoop@ubuntu1:~/userdata/files$ cat pandainfo.json
{"name":"Sparky The Bear", "lovesPandas":true}
{"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
编写一个对象读取并解析json文件
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
object CC{
case class Person(name:String,lovesPandas:Boolean)
def my(){
implicit val formats = Serialization.formats(ShortTypeHints(List()))
val input = sc.textFile("file:///home/hadoop/userdata/files/pandainfo.json")
input.collect().foreach(x => println(x))
val first = input.take(1)(0)
println(first)
println(first.getClass)
val p = parse(first).extract[Person] //解析json内容
println(p.name)
println(p.lovesPandas)
println("==========")
input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.lovesPandas)})
}
}
如果json文件是下面这个,那么在解析第二行的时候报这个:org.json4s.package$MappingException: No usable value for lovesPandas
目前尚未找到合理的办法解决这个
root@ubuntu1:/home/hadoop/userdata/files# cat pandainfo2.json
{"name":"Sparky The Bear", "lovesPandas":true}
{"name": "Holden"}
{"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
5.2.2.2 保存JSON
字符串RDD 转为解析好的JSON 数据的库,将由结构化数据组成的RDD 转为字符串RDD,然后使用Spark 的文本文件API 写出去。
scala> result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
5.2.3 逗号分隔值与制表符分隔值
就是每行固定数目的字段按照逗号分隔的文本文件
逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件,即TSV 文件中用制表符隔开)。
5.2.3.1 读取CSV
使用textFile() 读取CSV
import java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile("file:///home/hadoop/userdata/files/favourite_animals.csv")
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
result.collect
res1: Array[Array[String]] = Array(Array(holden, panda), Array(notholden, notpanda), Array(spark, bear)) //输出结果
在Scala 中完整读取CSV ,下面这段代码运行时没成功,语法有问题,没排查出来。
object BasicParseCsv{
case class Person(name: String, favouriteAnimal: String)
def my()={
val input = sc.textFile("file:///home/hadoop/userdata/files/favourite_animals.csv")
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
val people = result.map(x => Person(x(0), x(1)))
val pandaLovers = people.filter(person => person.favouriteAnimal == "panda")
pandaLovers.map(person => List(person.name, person.favouriteAnimal).toArray).mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(file:///home/hadoop/userdata/files/favourite_animals_result.csv)
}
}
5.2.3.2 保存CSV
通过重用输出编码器来加速。
5.2.4 sequenceFile
保存sequenceFile
val rdd = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
rdd.saveAsSequenceFile("file:///home/hadoop/userdata/files/sequenceOut")
$> hdfs dfs -text file:///home/hadoop/userdata/files/sequenceOut/part-0000* //还原sequencefile,每个worker为生成一个part,
读取sequenceFile
import import org.apache.hadoop.io._ //在eclipse里面搜索IntWritable类在哪个包下
val data = sc.sequenceFile("file:///home/hadoop/userdata/files/sequenceOut/part-00000",classOf[Text],classOf[IntWritable]) //这里得导入hadoop的一些类
scala>data.map{case (x, y) => (x.toString, y.get())}.collect
res1: Array[(String, Int)] = Array((Panda,3), (Kay,6), (Snail,2))
5.2.5 对象文件
val rdd = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
rdd.saveAsObjectFile("file:///home/hadoop/userdata/files/objectOut")
$> hdfs dfs -text file:///home/hadoop/userdata/files/objectOut/part-0000*
5.2.6 Hadoop输入输出格式
5.2.6.1. 读取其他Hadoop输入格式
除了spark封装的格式之外,spark也可以和任何hadoop支持的格式交互。
在Scala 中使用老式API(新式是newAPIHadoopFile)读取hadoop最简单的输入格式:KeyValueTextInputFormat()
val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{
case (x, y) => (x.toString, y.toString) //这里每一行会被独立处理,键值之间用制表符隔开。
}
读取API需要接受三个参数,第一个是key值类型,第二是value类型,第三个是输入格式类型。如果需设定额外的hadoop配置
属性,也可以传入一个conf对象。
5.2.6.2. 保存hadoop输出格式
使用旧式接口(saveAsHadoopFile)和新接口(saveAsNewAPIHadoopFile)的调用方法是类似的
在Java中使用旧式API保存SequenceFile
public static class ConvertToWritableTypes implements
PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
return new Tuple2(new Text(record._1), new IntWritable(record._2));
}
}
JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
SequenceFileOutputFormat.class);
5.2.6.3. 非文件系统数据源
除了hadoopFile() 和saveAsHadoopFile() 这一大类函数, 还可以使用hadoopDataset/
saveAsHadoopDataSet 和newAPIHadoopDataset/saveAsNewAPIHadoopDataset 来访问Hadoop 所
支持的非文件系统的存储格式。。例如,许多像HBase 和MongoDB 这样的键值对存储都提
供了用来直接读取Hadoop 输入格式的接口。
5.2.6.4. 示例:protocol buffer
略
5.2.7 文件压缩
spark压缩选项只适用于支持压缩的Hadoop格式
Spark 原生的输入方式(textFile 和sequenceFile)可以自动处理一些类型的压缩。在读
取压缩后的数据时,一些压缩编解码器可以推测压缩类型。
5.3 文件系统
5.3.1 本地常规文件系统:
读取本地一个压缩文件
val rdd = sc.textFile("file:///home/holden/happypandas.gz")
5.3.2 Amazon S3
略
5.3.3 HDFS
hdfs://master:port/path
5.4 Spark SQL中的结构化数据
Spark SQL 是在Spark 1.0 中新加入Spark 的组件,并快速成为了Spark 中较受欢迎的操作
结构化和半结构化数据的方式
5.4.1 Apache Hive
Spark SQL 可以读取Hive 支持的任何表。
Spark SQL 连接到已有的Hive,需要将hive-site.xml 文件复制到Spark 的./conf/ 目录下。
用Scala 创建HiveContext 并查询数据
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段
5.4.2 JSON
如果你有记录间结构一致的JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,
并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。
JSON 中的示例推文:
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
在Scala 中使用Spark SQL 读取JSON 数据:
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
5.5 数据库
Spark 可以从任何支持Java 数据库连接(JDBC) 的关系型数据库中读取数据。
要访问这些数据, 需要构建一个org.apache.spark.rdd.JdbcRDD,将SparkContext 和其他参数一起传给它。
5.5.1 Java数据库连接
Scala 中的JdbcRDD访问MYSQL
def createConnection() = {
Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
}
def extractValues(r: ResultSet) = {
(r.getInt(1), r.getString(2))
}
val data = new JdbcRDD(sc,
createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
println(data.collect().toList)
5.5.2 Cassandra
略
5.5.3 HBase
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 扫描哪张表
val rdd = sc.newAPIHadoopRDD( //将一张表的内容赋给rdd
conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])
5.5.4 Elasticsearch
略
5.6 总结
略