分享

Spark 高级分析:第十二章第4-6节 Spark和数据科学家的工作流程

本帖最后由 feilong 于 2019-3-8 08:11 编辑

问题导读

1.数据科学家的工作流程是怎样的
2.Spark文件格式有哪些,如何使用
3.Spark有哪些子项目,分别是做什么用的




上一篇
Spark 高级分析:第十二章第1-3节 深入Spark
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26746


第4节Spark和数据科学家的工作流程

在探索和尝试了解新数据集时,Spark的一些转换和操作尤其有用。其中一些算子使用随机性。在任务结果丢失并需要重新计算或多个操作利用同一个未缓存的RDD的情况下,这些运算符使用种子来保证确定性。

take可以以较低的成本查看RDD的前几个元素。如果前面没有需要无序排列的操作,则只需要计算第一个分区中的元素。
[mw_shl_code=scala,true]myFirstRdd.take(2)
14/09/29 12:09:13 INFO SparkContext: Starting job: take at <console>:15
...
14/09/29 12:09:13 INFO SparkContext: Job finished: take at <console>:15, took 0.329755931 s
res1: Array[Int] = Array(1, 2)[/mw_shl_code]
takeSample可用于将代表性数据样本拉入驱动程序中,以便在不同的环境(如R)中绘制图表、本地运行或导出以进行非分布式分析。其第一个参数with replacement决定样本是否可以包含同一记录的多个副本。
[mw_shl_code=scala,true]myFirstRdd.takeSample(true, 3)
14/09/29 12:14:18 INFO SparkContext: Starting job: takeSample at <console>:15
...
14/09/29 12:14:18 INFO SparkContext: Job finished: takeSample at <console>:15, took 0.051348012 s
res11: Array[Int] = Array(2, 1, 1)
myFirstRdd.takeSample(true, 5)
14/09/29 12:14:18 INFO SparkContext: Starting job: takeSample at <console>:15
...
14/09/29 12:14:18 INFO SparkContext: Job finished: takeSample at <console>:15, took 0.051348012 s
res11: Array[Int] = Array(2, 1, 1, 2, 4)
myFirstRdd.takeSample(false, 3)
14/09/29 12:14:18 INFO SparkContext: Starting job: takeSample at <console>:15
...
14/09/29 12:14:18 INFO SparkContext: Job finished: takeSample at <console>:15, took 0.051348012 s
res11: Array[Int] = Array(2, 1, 4)[/mw_shl_code]
top根据给定的顺序收集数据集中最大的K条记录。它在各种情况下都很有用,例如,在给每个记录打分后,检查得分最高的记录。相反的是takeOrder,它可以找到最小的记录。下面的代码片段生成介于0和100之间的随机数,并查找出现频率最高和最少的数字。
[mw_shl_code=scala,true]import scala.util.Random
val randNums = Seq.fill(10000)(Random.nextInt(100))
val numberCounts = sc.parallelize(randNums).map(x => (x, 1)).
reduceByKey(_ + _)
numCounts.top(3)(Ordering.by(_._2))
14/09/30 23:38:42 INFO SparkContext: Starting job: top at <console>:16
...
14/09/30 23:38:42 INFO SparkContext: Job finished: top at <console>:16, took 0.219709487 s
res6: Array[(Int, Int)] = Array((58,127), (25,120), (28,120))
numCounts.takeOrdered(3)(Ordering.by(_._2))
14/09/30 23:39:54 INFO SparkContext: Starting job: takeOrdered at <console>:16
...
14/09/30 23:39:54 INFO SparkContext: Job finished: takeOrdered at <console>:16, took 0.071684291 s
res7: Array[(Int, Int)] = Array((74,78), (92,79), (8,80))
[/mw_shl_code]
top函数首先以分布式方式在每个分区中找到最大的k,然后将这些k拖到驱动程序上,然后在所有分区中找到最大的k。当k很小时运行得很好,但当k大于或等于单个分区中数据的大小时,最终会将整个数据集拖到驱动程序上。对于这些情况,更明智的做法是使用sortByKey以分布式方式对整个数据集进行排序,然后取前k个元素。
[mw_shl_code=scala,true]numberCounts.map(_.swap).sortByKey().map(_.swap).take(5)
14/10/06 13:19:08 INFO SparkContext: Starting job: sortByKey at <console>:18
...
14/10/06 13:19:08 INFO DAGScheduler: Job 2 finished: take at <console>:18, took 0.086740 s
res3: Array[(Int, Int)] = Array((87,73), (19,76), (75,76), (25,81), (22,81))[/mw_shl_code]
上面的方法将数据拉入驱动程序,但通常采样对于创建分布式数据集很有用,因为这是管道中的一个步骤。示例通过对其父RDD进行采样来创建RDD。与takeSample一样,它可以在更换和不更换的情况下工作。它接受一个参数,该参数确定要作为父RDD大小的一部分进行采样的元素数。当进行替换采样时,Spark接受一个大于1的值,这对于数据集剧增以对管道进行压力测试很有用。样本对于排列数据也很有用,这是在像随机梯度下降这样的线性算法上运行之前的良好实践。
[mw_shl_code=scala,true]val bootstrapSample = rdd.sample(true, .6)
val permuted = rdd.sample(false, 1.0)[/mw_shl_code]
randomSpit返回多个RDD,组合起来就构成了它们的父级。它对于将数据拆分为训练集和测试集等任务特别有用。
[mw_shl_code=scala,true]fullData.cache()
val (train, test) = fullData.randomSplit(Array(0.6, 0.4))[/mw_shl_code]

第5节文件格式

Spark示例通常使用textFile,但通常建议以二进制格式存储大型数据集,这既可以减少空间,也可以强制执行键入。Avro和Parquet文件分别是标准的行和列格式,用于在Hadoop集群上存储数据。Avro还指来自这两种格式的Ondisk数据的内存表示。

下面的示例演示如何使用名称和常用颜色字段读取Avro字段。
[mw_shl_code=scala,true]import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
val conf = new Job()
FileInputFormat.setInputPaths(conf, inPaths)
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]],
classOf[NullWritable]).map(_._1.datum)
val namesAndColors = records.map(x =>
(x.get("name"), x.get("favorite_color")))[/mw_shl_code]
类似地,对于Parquet:
[mw_shl_code=scala,true]import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.avro.generic.GenericRecord
import parquet.hadoop.ParquetInputFormat
val conf = new Job()
FileInputFormat.setInputPaths(conf, inPaths)
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[ParquetInputFormat],
classOf[Void],
classOf[GenericRecord]).map(_._2)
val namesAndColors = records.map(x =>
(x.get("name"), x.get("favorite_color")))
[/mw_shl_code]
请注意,Avro支持两种内存表示:
&#8226;上面显示的Avro泛型将记录表示为从字符串键到对象值的映射。在探索新的数据集时,它们是最容易入门的,但也存在一些效率低下的问题,例如需要将基元类型包装到对象中。
&#8226;Avro特例使用代码生成来创建与Avro类型对应的Java类。为了简洁起见,这里省略了它们,但是与本书相关联的GitHub库包含一个示例。

第6节Spark子项目
Spark Core是指Spark的分布式执行引擎和上一章描述的核心Spark API。除了Spark Core,Spark还包含一系列子项目,这些子项目在其引擎之上提供了功能。这些子项目,详细如下,处于不同的发展阶段。虽然核心Spark API将保持稳定并保持兼容性,但标记为alpha或beta的子项目的API可能会发生变化。

MLlib
MLLIB提供了一套在Spark之上的机器学习算法。该项目旨在高质量地实现标准算法,重点是可维护性和广度上的一致性。撰写本文时,MLLIB支持以下功能:
图片1.png
MLlib将数据表示为矢量对象,这可能是稀疏的或密集的。里面有一些用于在表示局部的矩阵对象上操作的轻线性代数功能矩阵和RowMatrix对象,它们表示分布的向量集合。它依靠Scala线性代数库Breeze来布局和操作覆盖下的数据。
在编写本书时,mllib是一个beta组件,这意味着一些API可能在将来的版本中发生更改。
本书的几章利用了MLlib的算法:
&#8226;第3章使用MLlib的交替最小二乘法来提出建议。
&#8226;第4章使用MLlib的随机决策森林实现进行分类。
&#8226;第5章使用MLlib的k-means聚类实现进行异常检测。
&#8226;第6章使用MLlib的奇异值分解实现进行文本分析

Spark Streaming
Spark Streaming用于Spark执行引擎连续处理数据。Spark的典型批处理在大型数据集上同时执行作业时,Spark流旨在降低延迟(数百毫秒):当数据可用时,需要对其进行近实时的转换和处理。通过在小时间内积累的小批量数据上运行作业来激发流函数间隔。它对于快速警报、向仪表盘提供最新信息以及需要更复杂分析的情况非常有用。例如,异常检测中的一个常见用例是对一批数据运行k-means集群,如果集群中心偏离正常值,则触发警告。

Spark SQL
spark sql使用spark引擎来执行SQL查询——要么在HDFS中持久存储的数据集上,要么在现有RDD上。它允许在spark程序中使用SQL语句操作数据。
[mw_shl_code=scala,true]import org.apache.spark.sql.hive.HiveContext
val sqlContext = HiveContext(sc)
val schemaRdd = sqlContext.sql("FROM sometable SELECT column1, column2, column3")
schemaRdd.collect().foreach(println)[/mw_shl_code]
Spark SQL的核心数据结构是一个SchemaRDD,一个带有模式信息的RDD,为每一列提供名称和类型。可以通过用类型信息以编程方式注释现有RDD,或者访问存储在配置单元中的已存在的模式数据来创建模式RDD,如上图所示。

在撰写本书时,spark sql是一个alpha组件,这意味着它的一些API在未来的版本中可能会发生变化。

GraphX
Spark包含一个名为graphx的子项目,该子项目利用其引擎进行图形处理。在计算机科学中,“图”一词是指由一组边连接的顶点组成的结构。图形算法对于诸如检查社交网络中用户之间的连接、了解基于哪些页面链接到Internet上页面的重要性或运行依赖于实体之间连接结构的任何分析等任务都很有用。graphx表示带有一对RDD的图——顶点的RDD和边的RDD。它公开了一个类似于Google的pregel图形处理系统的API,并且只在少数几行代码中表示pagerank等常见算法。

在撰写本书时,Graphx是一个alpha组件,这意味着它的一些API在未来的版本中可能会发生变化。第6章利用了各种GraphX分析引文图的能力。






最新经典文章,欢迎关注公众号





已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条