立即注册 登录
About云-梭伦科技 返回首页

一颗银杏树的个人空间 https://www.aboutyun.com/?53709 [收藏] [复制] [分享] [RSS]

日志

《spark快速数据分析》读书笔记之第五章:数据读取与保存

已有 1505 次阅读2017-5-22 14:46 |个人分类:spark| spark, 读取, 输出, 文本文件, SJON

5.1 动机
    数据量可能大到无法放在一台机器中,这时就需要探索别的数据读取和保存的方法了
    spark基于hadoop生态圈构建,因此可以通过Hadoop MapReduce所使用的InputFormat和OutPutFromat接口访问如下文件格式及存储系统:
    * 文件格式与文件系统
    文本文件、JSON、SequenceFile、protocal buffer
    NFS、HDFS、Amazon S3等分布式文件系统
    * spark SQL中结构化数据源
    JSON、Hive
    * 数据库与键值存储
    Cassandra、Hbase、JDBC源
   
5.2 文件格式
    Spark支持的一些常见格式:
    格式名称        结构化    备注
    文本文件          否       普通的文本文件,每行一条记录
    JSON           半结构化    常见的基于文本的格式,半结构化;大多数库都要求每行一条记录
    CSV               是       非常常见的基于文本的格式,通常在电子表格应用中使用
    SequenceFiles     是       一种用于键值对数据的常见Hadoop 文件格式
    Protocol buffers  是       一种快速、节约空间的跨语言格式
    对象文件          是       用来将Spark 作业中的数据存储下来以让共享的代码读取。改变类的时候它会失效,因为它依赖于Java 序列化
    
    5.2.1  文本文件
        5.2.1.1 读取文本文件
        文本文件读取为RDD时,输入的每一行都会成为RDD的一个元素。
        也可以将多个完整的文本文件一次性读取为一个pair RDD,其中键是文件名,值是文件内容。
        
        读取一个文本文件:val input = sc.textFile("文件名")
        读取多个文本文件:
        方法一:val input = sc.textFile("目录名而非文件名")        //这里会把目录中的所有文件读入到RDD中
        方法二:val input = sc.wholeTextFiles("目录名")            //该方法会返回一个pair RDD,其中键是输入文件的文件名
       
        举例说明之:
        (1)准备数据源
        hadoop@ubuntu1:~/userdata/files$ cat /home/hadoop/userdata/files/people1.txt 
        Michael,29
        Andy,30
        Justin,19
        hadoop@ubuntu1:~/userdata/files$ cat /home/hadoop/userdata/files/people2.txt 
        Tom,10
        Tonny,11
        Bob,12
        
        (2) textFile
        找出两个文件中年龄最大的people
        val input = sc.textFile("file:///home/hadoop/userdata/files/")
        val result1 = input.map{x => x.split(",")(1).toInt}.max
        val result2 = result1.filter(x => x._2 == result1)
        
        分别找出两个文件中年龄最大的people
        val input = sc.wholeTextFiles("file:///home/hadoop/userdata/files/")   
        scala> input.keys.collect           //文件名作为key值
        res0: Array[String] = Array(file:/home/hadoop/userdata/files/people1.txt, file:/home/hadoop/userdata/files/people2.txt)
        scala> input.values.collect         //文件内容作为values          
        res1: Array[String] = Array("Michael,29
        Andy,30
        Justin,19
        ", "Tom,10
        Tonny,11
        Bob,12
        ")
        scala> input.mapValues(x => x.split("\n")).flatMapValues(x => x).mapValues(x => x.split(",")).mapValues(x => x(1).toInt).reduceByKey((x,y) => (if(x < y) y else x)).collect
        res41: Array[(String, Int)] = Array((file:/home/hadoop/userdata/files/people1.txt,30), (file:/home/hadoop/userdata/files/people2.txt,12))  //分别找了两个文件中的最大值。
        
        5.2.1.2. 保存文本文件
        result.saveAsTextFile(outputFile)
        
    5.2.2 JSON
        JSON 是一种使用较广的半结构化数据格式。
        读取JSON 数据的最简单的方式是将数据作为文本文件读取,然后使用JSON 解析器来对RDD 中的值进行映射操作。
        另外也可以用JSON 序列化库来将数据转为字符串,然后将其写出去。
        
        5.2.2.1. 读取JSON
        将数据作为文本文件读取,然后对JSON 数据进行解析。
        书中说道能够使用spark进行json 的读写,但是给出的例子使用的json转换的jar包是jackson的,还需要再次进行引入
        下面则是一个spark内部已经有的基于scala的json处理的jar包 -- json4s包
        下面引用的一个例子来自网页:http://blog.csdn.net/wild46cat/article/details/54174695
        
        json文件内容如下:
        hadoop@ubuntu1:~/userdata/files$ cat pandainfo.json 
        {"name":"Sparky The Bear", "lovesPandas":true}
        {"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
        
        编写一个对象读取并解析json文件
        import org.json4s._
        import org.json4s.jackson.JsonMethods._  
        import org.json4s.jackson.Serialization  
        object CC{  
        case class Person(name:String,lovesPandas:Boolean)
        def my(){  
        implicit val formats = Serialization.formats(ShortTypeHints(List()))
        val input = sc.textFile("file:///home/hadoop/userdata/files/pandainfo.json")
        input.collect().foreach(x => println(x))
        val first = input.take(1)(0)
        println(first)
        println(first.getClass)
        val p = parse(first).extract[Person]   //解析json内容
        println(p.name)
        println(p.lovesPandas)
        println("==========")
        input.collect().foreach(x => {var c = parse(x).extract[Person];println(c.name + "," + c.lovesPandas)})  
        }
        }
        
        如果json文件是下面这个,那么在解析第二行的时候报这个:org.json4s.package$MappingException: No usable value for lovesPandas
        目前尚未找到合理的办法解决这个
        root@ubuntu1:/home/hadoop/userdata/files# cat pandainfo2.json 
        {"name":"Sparky The Bear", "lovesPandas":true}
        {"name": "Holden"}
        {"name":"Sparky The Bear", "lovesPandas":true, "knows":{"friends": ["holden"]}}
        5.2.2.2 保存JSON
        字符串RDD 转为解析好的JSON 数据的库,将由结构化数据组成的RDD 转为字符串RDD,然后使用Spark 的文本文件API 写出去。
        scala> result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)).saveAsTextFile(outputFile)
        
    5.2.3 逗号分隔值与制表符分隔值
        就是每行固定数目的字段按照逗号分隔的文本文件
        逗号分隔值(CSV)文件每行都有固定数目的字段,字段间用逗号隔开(在制表符分隔值文件,即TSV 文件中用制表符隔开)。
        
        5.2.3.1 读取CSV
        使用textFile() 读取CSV
        import java.io.StringReader
        import au.com.bytecode.opencsv.CSVReader
        val input = sc.textFile("file:///home/hadoop/userdata/files/favourite_animals.csv")
        val result = input.map{ line =>
        val reader = new CSVReader(new StringReader(line));
        reader.readNext();
        }
        result.collect
        res1: Array[Array[String]] = Array(Array(holden, panda), Array(notholden, notpanda), Array(spark, bear)) //输出结果
        
        在Scala 中完整读取CSV ,下面这段代码运行时没成功,语法有问题,没排查出来。
        object BasicParseCsv{
          case class Person(name: String, favouriteAnimal: String)
          def my()={
        val input = sc.textFile("file:///home/hadoop/userdata/files/favourite_animals.csv")
        val result = input.map{ line =>
        val reader = new CSVReader(new StringReader(line));
         reader.readNext();
        }
        val people = result.map(x => Person(x(0), x(1)))
        val pandaLovers = people.filter(person => person.favouriteAnimal == "panda")
        pandaLovers.map(person => List(person.name, person.favouriteAnimal).toArray).mapPartitions{people =>
         val stringWriter = new StringWriter();
         val csvWriter = new CSVWriter(stringWriter);
         csvWriter.writeAll(people.toList)
         Iterator(stringWriter.toString)
        }.saveAsTextFile(file:///home/hadoop/userdata/files/favourite_animals_result.csv)   
          }
        }
        5.2.3.2 保存CSV
        通过重用输出编码器来加速。
        
    5.2.4 sequenceFile
        
        保存sequenceFile
        val rdd = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
        rdd.saveAsSequenceFile("file:///home/hadoop/userdata/files/sequenceOut")
        $> hdfs dfs -text file:///home/hadoop/userdata/files/sequenceOut/part-0000* //还原sequencefile,每个worker为生成一个part,
        
        读取sequenceFile
        import import org.apache.hadoop.io._    //在eclipse里面搜索IntWritable类在哪个包下
        val data = sc.sequenceFile("file:///home/hadoop/userdata/files/sequenceOut/part-00000",classOf[Text],classOf[IntWritable])  //这里得导入hadoop的一些类
        scala>data.map{case (x, y) => (x.toString, y.get())}.collect
        res1: Array[(String, Int)] = Array((Panda,3), (Kay,6), (Snail,2))
        
    5.2.5 对象文件
        val rdd = sc.parallelize(List(("Panda",3),("Kay",6),("Snail",2)))
        rdd.saveAsObjectFile("file:///home/hadoop/userdata/files/objectOut")
        $> hdfs dfs -text file:///home/hadoop/userdata/files/objectOut/part-0000*
             
    5.2.6 Hadoop输入输出格式
        5.2.6.1. 读取其他Hadoop输入格式
        除了spark封装的格式之外,spark也可以和任何hadoop支持的格式交互。
        在Scala 中使用老式API(新式是newAPIHadoopFile)读取hadoop最简单的输入格式:KeyValueTextInputFormat()
        val input = sc.hadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{
        case (x, y) => (x.toString, y.toString)   //这里每一行会被独立处理,键值之间用制表符隔开。
        }
        读取API需要接受三个参数,第一个是key值类型,第二是value类型,第三个是输入格式类型。如果需设定额外的hadoop配置
        属性,也可以传入一个conf对象。
        
        5.2.6.2. 保存hadoop输出格式
        使用旧式接口(saveAsHadoopFile)和新接口(saveAsNewAPIHadoopFile)的调用方法是类似的
        在Java中使用旧式API保存SequenceFile
        public static class ConvertToWritableTypes implements
        PairFunction<Tuple2<String, Integer>, Text, IntWritable> {
        public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> record) {
        return new Tuple2(new Text(record._1), new IntWritable(record._2));
        }
        }
        JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(input);
        JavaPairRDD<Text, IntWritable> result = rdd.mapToPair(new ConvertToWritableTypes());
        result.saveAsHadoopFile(fileName, Text.class, IntWritable.class,
        SequenceFileOutputFormat.class);
        
        5.2.6.3. 非文件系统数据源
        除了hadoopFile() 和saveAsHadoopFile() 这一大类函数, 还可以使用hadoopDataset/
        saveAsHadoopDataSet 和newAPIHadoopDataset/saveAsNewAPIHadoopDataset 来访问Hadoop 所
        支持的非文件系统的存储格式。。例如,许多像HBase 和MongoDB 这样的键值对存储都提
        供了用来直接读取Hadoop 输入格式的接口。
       
        5.2.6.4. 示例:protocol buffer
       
        
     5.2.7 文件压缩
        spark压缩选项只适用于支持压缩的Hadoop格式
        Spark 原生的输入方式(textFile 和sequenceFile)可以自动处理一些类型的压缩。在读
        取压缩后的数据时,一些压缩编解码器可以推测压缩类型。

5.3 文件系统
     5.3.1 本地常规文件系统:
        读取本地一个压缩文件
        val rdd = sc.textFile("file:///home/holden/happypandas.gz")
     5.3.2 Amazon S3
        略
     5.3.3 HDFS
        hdfs://master:port/path
     
5.4 Spark SQL中的结构化数据
     Spark SQL 是在Spark 1.0 中新加入Spark 的组件,并快速成为了Spark 中较受欢迎的操作
     结构化和半结构化数据的方式
     
     
     5.4.1 Apache Hive
     Spark SQL 可以读取Hive 支持的任何表。
     Spark SQL 连接到已有的Hive,需要将hive-site.xml 文件复制到Spark 的./conf/ 目录下。
     
     用Scala 创建HiveContext 并查询数据
     import org.apache.spark.sql.hive.HiveContext
     val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
     val rows = hiveCtx.sql("SELECT name, age FROM users")
     val firstRow = rows.first()
     println(firstRow.getString(0)) // 字段0是name字段   
     
     5.4.2 JSON
     如果你有记录间结构一致的JSON 数据,Spark SQL 也可以自动推断出它们的结构信息,
     并将这些数据读取为记录,这样就可以使得提取字段的操作变得很简单。
     
     JSON 中的示例推文:
     {"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
     {"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
     在Scala 中使用Spark SQL 读取JSON 数据:
     val tweets = hiveCtx.jsonFile("tweets.json")
     tweets.registerTempTable("tweets")
     val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
5.5 数据库
     Spark 可以从任何支持Java 数据库连接(JDBC) 的关系型数据库中读取数据。
     要访问这些数据, 需要构建一个org.apache.spark.rdd.JdbcRDD,将SparkContext 和其他参数一起传给它。
     5.5.1 Java数据库连接
     Scala 中的JdbcRDD访问MYSQL
     def createConnection() = {
     Class.forName("com.mysql.jdbc.Driver").newInstance();
     DriverManager.getConnection("jdbc:mysql://localhost/test?user=holden");
     }
     def extractValues(r: ResultSet) = {
     (r.getInt(1), r.getString(2))
     }
     val data = new JdbcRDD(sc,
     createConnection, "SELECT * FROM panda WHERE ? <= id AND id <= ?",
     lowerBound = 1, upperBound = 3, numPartitions = 2, mapRow = extractValues)
     println(data.collect().toList)
     
     5.5.2 Cassandra
     略
       
     5.5.3 HBase
     import org.apache.hadoop.hbase.HBaseConfiguration
     import org.apache.hadoop.hbase.client.Result
     import org.apache.hadoop.hbase.io.ImmutableBytesWritable
     import org.apache.hadoop.hbase.mapreduce.TableInputFormat
     val conf = HBaseConfiguration.create()
     conf.set(TableInputFormat.INPUT_TABLE, "tablename") // 扫描哪张表
     val rdd = sc.newAPIHadoopRDD(           //将一张表的内容赋给rdd
     conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],classOf[Result])
     
     5.5.4 Elasticsearch
     略
     
5.6 总结
     略

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条